You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/14 19:25:33 UTC
[2/2] hive git commit: HIVE-19838 : simplify & fix
ColumnizedDeleteEventRegistry load loop (Sergey Shelukhin,
reviewed by Teddy Choi)
HIVE-19838 : simplify & fix ColumnizedDeleteEventRegistry load loop (Sergey Shelukhin, reviewed by Teddy Choi)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88d0da45
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88d0da45
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88d0da45
Branch: refs/heads/branch-3
Commit: 88d0da45b30c5e1f19af45a33a7c891175f7c38e
Parents: 55bc285
Author: sergey <se...@apache.org>
Authored: Thu Jun 14 12:15:28 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Jun 14 12:15:52 2018 -0700
----------------------------------------------------------------------
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 159 ++++++++++---------
1 file changed, 83 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/88d0da45/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index a4568de..66ffcae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -859,9 +861,11 @@ public class VectorizedOrcAcidRowBatchReader
private final ValidWriteIdList validWriteIdList;
private boolean isBucketPropertyRepeating;
private final boolean isBucketedTable;
+ private final Reader reader;
DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws IOException {
+ this.reader = deleteDeltaReader;
this.recordReader = deleteDeltaReader.rowsOptions(readerOptions);
this.bucketForSplit = bucket;
this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -955,6 +959,12 @@ public class VectorizedOrcAcidRowBatchReader
" from " + dummy + " curTxnId: " + curTxnId);
}
}
+
+ @Override
+ public String toString() {
+ return "{reader=" + reader + ", isBucketPropertyRepeating=" + isBucketPropertyRepeating +
+ ", bucketForSplit=" + bucketForSplit + ", isBucketedTable=" + isBucketedTable + "}";
+ }
}
/**
* A CompressedOwid class stores a compressed representation of the original
@@ -968,7 +978,7 @@ public class VectorizedOrcAcidRowBatchReader
final long originalWriteId;
final int bucketProperty;
final int fromIndex; // inclusive
- final int toIndex; // exclusive
+ int toIndex; // exclusive
CompressedOwid(long owid, int bucketProperty, int fromIndex, int toIndex) {
this.originalWriteId = owid;
@@ -1027,49 +1037,50 @@ public class VectorizedOrcAcidRowBatchReader
int totalDeleteEventCount = 0;
for (Path deleteDeltaDir : deleteDeltaDirs) {
FileSystem fs = deleteDeltaDir.getFileSystem(conf);
- for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf,
- new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) {
- // NOTE: Calling last flush length below is more for future-proofing when we have
- // streaming deletes. But currently we don't support streaming deletes, and this can
- // be removed if this becomes a performance issue.
- long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
- // NOTE: A check for existence of deleteDeltaFile is required because we may not have
- // deletes for the bucket being taken into consideration for this split processing.
- if (length != -1 && fs.exists(deleteDeltaFile)) {
- Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
- OrcFile.readerOptions(conf).maxLength(length));
- AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
- if (acidStats.deletes == 0) {
- continue; // just a safe check to ensure that we are not reading empty delete files.
- }
- totalDeleteEventCount += acidStats.deletes;
- if (totalDeleteEventCount > maxEventsInMemory) {
- // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
- // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
- // prevents creation of an object of this class if the total number of delete events
- // exceed this value. By default, it has been set to 10 million delete events per bucket.
- LOG.info("Total number of delete events exceeds the maximum number of delete events "
- + "that can be loaded into memory for the delete deltas in the directory at : "
- + deleteDeltaDirs.toString() +". The max limit is currently set at "
- + maxEventsInMemory + " and can be changed by setting the Hive config variable "
- + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
- throw new DeleteEventsOverflowMemoryException();
- }
- DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
- readerOptions, bucket, validWriteIdList, isBucketedTable);
- DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
- if (deleteReaderValue.next(deleteRecordKey)) {
- sortMerger.put(deleteRecordKey, deleteReaderValue);
- } else {
- deleteReaderValue.close();
+ Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
+ conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable);
+ for (Path deleteDeltaFile : deleteDeltaFiles) {
+ // NOTE: Calling last flush length below is more for future-proofing when we have
+ // streaming deletes. But currently we don't support streaming deletes, and this can
+ // be removed if this becomes a performance issue.
+ long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
+ // NOTE: A check for existence of deleteDeltaFile is required because we may not have
+ // deletes for the bucket being taken into consideration for this split processing.
+ if (length != -1 && fs.exists(deleteDeltaFile)) {
+ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
+ OrcFile.readerOptions(conf).maxLength(length));
+ AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+ if (acidStats.deletes == 0) {
+ continue; // just a safe check to ensure that we are not reading empty delete files.
+ }
+ totalDeleteEventCount += acidStats.deletes;
+ if (totalDeleteEventCount > maxEventsInMemory) {
+ // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
+ // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
+ // prevents creation of an object of this class if the total number of delete events
+ // exceed this value. By default, it has been set to 10 million delete events per bucket.
+ LOG.info("Total number of delete events exceeds the maximum number of delete events "
+ + "that can be loaded into memory for the delete deltas in the directory at : "
+ + deleteDeltaDirs.toString() +". The max limit is currently set at "
+ + maxEventsInMemory + " and can be changed by setting the Hive config variable "
+ + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+ throw new DeleteEventsOverflowMemoryException();
+ }
+ DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
+ readerOptions, bucket, validWriteIdList, isBucketedTable);
+ DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+ if (deleteReaderValue.next(deleteRecordKey)) {
+ sortMerger.put(deleteRecordKey, deleteReaderValue);
+ } else {
+ deleteReaderValue.close();
+ }
}
}
}
- }
+ // Note: totalDeleteEventCount can actually be higher than real value.
+ // We assume here it won't be lower. Maybe we should just read and not guess...
if (totalDeleteEventCount > 0) {
- // Initialize the rowId array when we have some delete events.
- rowIds = new long[totalDeleteEventCount];
- readAllDeleteEventsFromDeleteDeltas();
+ readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount);
}
}
isEmpty = compressedOwids == null || rowIds == null;
@@ -1087,15 +1098,21 @@ public class VectorizedOrcAcidRowBatchReader
* In practice we should be filtering delete evens by min/max ROW_ID from the split. The later
* is also not yet implemented: HIVE-16812.
*/
- private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
- if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
- int distinctOwids = 0;
- long lastSeenOwid = -1;
- int lastSeenBucketProperty = -1;
- long owids[] = new long[rowIds.length];
- int[] bucketProperties = new int [rowIds.length];
-
+ private void readAllDeleteEventsFromDeleteDeltas(
+ int totalDeleteEventCount) throws IOException {
+ if (sortMerger == null || sortMerger.isEmpty()) {
+ rowIds = new long[0];
+ return; // trivial case, nothing to read.
+ }
+
+ // Initialize the rowId array when we have some delete events.
+ rowIds = new long[totalDeleteEventCount];
+
int index = 0;
+ // We compress the owids into CompressedOwid data structure that records
+ // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
+ List<CompressedOwid> compressedOwids = new ArrayList<>();
+ CompressedOwid lastCo = null;
while (!sortMerger.isEmpty()) {
// The sortMerger is a heap data structure that stores a pair of
// (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey.
@@ -1109,45 +1126,35 @@ public class VectorizedOrcAcidRowBatchReader
Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
DeleteRecordKey deleteRecordKey = entry.getKey();
DeleteReaderValue deleteReaderValue = entry.getValue();
- owids[index] = deleteRecordKey.originalWriteId;
- bucketProperties[index] = deleteRecordKey.bucketProperty;
+ long owid = deleteRecordKey.originalWriteId;
+ int bp = deleteRecordKey.bucketProperty;
rowIds[index] = deleteRecordKey.rowId;
- ++index;
- if (lastSeenOwid != deleteRecordKey.originalWriteId ||
- lastSeenBucketProperty != deleteRecordKey.bucketProperty) {
- ++distinctOwids;
- lastSeenOwid = deleteRecordKey.originalWriteId;
- lastSeenBucketProperty = deleteRecordKey.bucketProperty;
+ if (lastCo == null || lastCo.originalWriteId != owid || lastCo.bucketProperty != bp) {
+ if (lastCo != null) {
+ lastCo.toIndex = index; // Finalize the previous record.
+ }
+ lastCo = new CompressedOwid(owid, bp, index, -1);
+ compressedOwids.add(lastCo);
}
+ ++index;
if (deleteReaderValue.next(deleteRecordKey)) {
sortMerger.put(deleteRecordKey, deleteReaderValue);
} else {
deleteReaderValue.close(); // Exhausted reading all records, close the reader.
}
}
-
- // Once we have processed all the delete events and seen all the distinct owids,
- // we compress the owids into CompressedOwid data structure that records
- // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
- this.compressedOwids = new CompressedOwid[distinctOwids];
- lastSeenOwid = owids[0];
- lastSeenBucketProperty = bucketProperties[0];
- int fromIndex = 0, pos = 0;
- for (int i = 1; i < owids.length; ++i) {
- if (owids[i] != lastSeenOwid || lastSeenBucketProperty != bucketProperties[i]) {
- compressedOwids[pos] =
- new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, i);
- lastSeenOwid = owids[i];
- lastSeenBucketProperty = bucketProperties[i];
- fromIndex = i;
- ++pos;
- }
+ if (lastCo != null) {
+ lastCo.toIndex = index; // Finalize the last record.
+ lastCo = null;
+ }
+ if (rowIds.length > index) {
+ rowIds = Arrays.copyOf(rowIds, index);
}
- // account for the last distinct owid
- compressedOwids[pos] =
- new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, owids.length);
+
+ this.compressedOwids = compressedOwids.toArray(new CompressedOwid[compressedOwids.size()]);
}
+
private boolean isDeleted(long owid, int bucketProperty, long rowId) {
if (compressedOwids == null || rowIds == null) {
return false;