You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/14 19:25:33 UTC

[2/2] hive git commit: HIVE-19838 : simplify & fix ColumnizedDeleteEventRegistry load loop (Sergey Shelukhin, reviewed by Teddy Choi)

HIVE-19838 : simplify & fix ColumnizedDeleteEventRegistry load loop (Sergey Shelukhin, reviewed by Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88d0da45
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88d0da45
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88d0da45

Branch: refs/heads/branch-3
Commit: 88d0da45b30c5e1f19af45a33a7c891175f7c38e
Parents: 55bc285
Author: sergey <se...@apache.org>
Authored: Thu Jun 14 12:15:28 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Jun 14 12:15:52 2018 -0700

----------------------------------------------------------------------
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 159 ++++++++++---------
 1 file changed, 83 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/88d0da45/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index a4568de..66ffcae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
@@ -859,9 +861,11 @@ public class VectorizedOrcAcidRowBatchReader
       private final ValidWriteIdList validWriteIdList;
       private boolean isBucketPropertyRepeating;
       private final boolean isBucketedTable;
+      private final Reader reader;
 
       DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
           ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws IOException {
+        this.reader = deleteDeltaReader;
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
         this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -955,6 +959,12 @@ public class VectorizedOrcAcidRowBatchReader
             " from " + dummy + " curTxnId: " + curTxnId);
         }
       }
+
+      @Override
+      public String toString() {
+        return "{reader=" + reader + ", isBucketPropertyRepeating=" + isBucketPropertyRepeating +
+            ", bucketForSplit=" + bucketForSplit + ", isBucketedTable=" + isBucketedTable + "}";
+      }
     }
     /**
      * A CompressedOwid class stores a compressed representation of the original
@@ -968,7 +978,7 @@ public class VectorizedOrcAcidRowBatchReader
       final long originalWriteId;
       final int bucketProperty;
       final int fromIndex; // inclusive
-      final int toIndex; // exclusive
+      int toIndex; // exclusive
 
       CompressedOwid(long owid, int bucketProperty, int fromIndex, int toIndex) {
         this.originalWriteId = owid;
@@ -1027,49 +1037,50 @@ public class VectorizedOrcAcidRowBatchReader
           int totalDeleteEventCount = 0;
           for (Path deleteDeltaDir : deleteDeltaDirs) {
             FileSystem fs = deleteDeltaDir.getFileSystem(conf);
-            for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf,
-              new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) {
-            // NOTE: Calling last flush length below is more for future-proofing when we have
-            // streaming deletes. But currently we don't support streaming deletes, and this can
-            // be removed if this becomes a performance issue.
-            long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
-            // NOTE: A check for existence of deleteDeltaFile is required because we may not have
-            // deletes for the bucket being taken into consideration for this split processing.
-            if (length != -1 && fs.exists(deleteDeltaFile)) {
-              Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
-                  OrcFile.readerOptions(conf).maxLength(length));
-              AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
-              if (acidStats.deletes == 0) {
-                continue; // just a safe check to ensure that we are not reading empty delete files.
-              }
-              totalDeleteEventCount += acidStats.deletes;
-              if (totalDeleteEventCount > maxEventsInMemory) {
-                // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
-                // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
-                // prevents creation of an object of this class if the total number of delete events
-                // exceed this value. By default, it has been set to 10 million delete events per bucket.
-                LOG.info("Total number of delete events exceeds the maximum number of delete events "
-                    + "that can be loaded into memory for the delete deltas in the directory at : "
-                    + deleteDeltaDirs.toString() +". The max limit is currently set at "
-                    + maxEventsInMemory + " and can be changed by setting the Hive config variable "
-                    + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
-                throw new DeleteEventsOverflowMemoryException();
-              }
-              DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                  readerOptions, bucket, validWriteIdList, isBucketedTable);
-              DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-              if (deleteReaderValue.next(deleteRecordKey)) {
-                sortMerger.put(deleteRecordKey, deleteReaderValue);
-              } else {
-                deleteReaderValue.close();
+            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
+                conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable);
+            for (Path deleteDeltaFile : deleteDeltaFiles) {
+              // NOTE: Calling last flush length below is more for future-proofing when we have
+              // streaming deletes. But currently we don't support streaming deletes, and this can
+              // be removed if this becomes a performance issue.
+              long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
+              // NOTE: A check for existence of deleteDeltaFile is required because we may not have
+              // deletes for the bucket being taken into consideration for this split processing.
+              if (length != -1 && fs.exists(deleteDeltaFile)) {
+                Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
+                    OrcFile.readerOptions(conf).maxLength(length));
+                AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+                if (acidStats.deletes == 0) {
+                  continue; // just a safe check to ensure that we are not reading empty delete files.
+                }
+                totalDeleteEventCount += acidStats.deletes;
+                if (totalDeleteEventCount > maxEventsInMemory) {
+                  // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
+                  // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
+                  // prevents creation of an object of this class if the total number of delete events
+                  // exceed this value. By default, it has been set to 10 million delete events per bucket.
+                  LOG.info("Total number of delete events exceeds the maximum number of delete events "
+                      + "that can be loaded into memory for the delete deltas in the directory at : "
+                      + deleteDeltaDirs.toString() +". The max limit is currently set at "
+                      + maxEventsInMemory + " and can be changed by setting the Hive config variable "
+                      + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+                  throw new DeleteEventsOverflowMemoryException();
+                }
+                DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
+                    readerOptions, bucket, validWriteIdList, isBucketedTable);
+                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+                if (deleteReaderValue.next(deleteRecordKey)) {
+                  sortMerger.put(deleteRecordKey, deleteReaderValue);
+                } else {
+                  deleteReaderValue.close();
+                }
               }
             }
           }
-          }
+          // Note: totalDeleteEventCount can actually be higher than real value.
+          //       We assume here it won't be lower. Maybe we should just read and not guess...
           if (totalDeleteEventCount > 0) {
-            // Initialize the rowId array when we have some delete events.
-            rowIds = new long[totalDeleteEventCount];
-            readAllDeleteEventsFromDeleteDeltas();
+            readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount);
           }
         }
         isEmpty = compressedOwids == null || rowIds == null;
@@ -1087,15 +1098,21 @@ public class VectorizedOrcAcidRowBatchReader
      * In practice we should be filtering delete evens by min/max ROW_ID from the split.  The later
      * is also not yet implemented: HIVE-16812.
      */
-    private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
-      if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
-      int distinctOwids = 0;
-      long lastSeenOwid = -1;
-      int lastSeenBucketProperty = -1;
-      long owids[] = new long[rowIds.length];
-      int[] bucketProperties = new int [rowIds.length];
-      
+    private void readAllDeleteEventsFromDeleteDeltas(
+        int totalDeleteEventCount) throws IOException {
+      if (sortMerger == null || sortMerger.isEmpty()) {
+        rowIds = new long[0];
+        return; // trivial case, nothing to read.
+      }
+
+      // Initialize the rowId array when we have some delete events.
+      rowIds = new long[totalDeleteEventCount];
+
       int index = 0;
+      // We compress the owids into CompressedOwid data structure that records
+      // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
+      List<CompressedOwid> compressedOwids = new ArrayList<>();
+      CompressedOwid lastCo = null;
       while (!sortMerger.isEmpty()) {
         // The sortMerger is a heap data structure that stores a pair of
         // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey.
@@ -1109,45 +1126,35 @@ public class VectorizedOrcAcidRowBatchReader
         Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
         DeleteRecordKey deleteRecordKey = entry.getKey();
         DeleteReaderValue deleteReaderValue = entry.getValue();
-        owids[index] = deleteRecordKey.originalWriteId;
-        bucketProperties[index] = deleteRecordKey.bucketProperty;
+        long owid = deleteRecordKey.originalWriteId;
+        int bp = deleteRecordKey.bucketProperty;
         rowIds[index] = deleteRecordKey.rowId;
-        ++index;
-        if (lastSeenOwid != deleteRecordKey.originalWriteId ||
-          lastSeenBucketProperty != deleteRecordKey.bucketProperty) {
-          ++distinctOwids;
-          lastSeenOwid = deleteRecordKey.originalWriteId;
-          lastSeenBucketProperty = deleteRecordKey.bucketProperty;
+        if (lastCo == null || lastCo.originalWriteId != owid || lastCo.bucketProperty != bp) {
+          if (lastCo != null) {
+            lastCo.toIndex = index; // Finalize the previous record.
+          }
+          lastCo = new CompressedOwid(owid, bp, index, -1);
+          compressedOwids.add(lastCo);
         }
+        ++index;
         if (deleteReaderValue.next(deleteRecordKey)) {
           sortMerger.put(deleteRecordKey, deleteReaderValue);
         } else {
           deleteReaderValue.close(); // Exhausted reading all records, close the reader.
         }
       }
-
-      // Once we have processed all the delete events and seen all the distinct owids,
-      // we compress the owids into CompressedOwid data structure that records
-      // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
-      this.compressedOwids = new CompressedOwid[distinctOwids];
-      lastSeenOwid = owids[0];
-      lastSeenBucketProperty = bucketProperties[0];
-      int fromIndex = 0, pos = 0;
-      for (int i = 1; i < owids.length; ++i) {
-        if (owids[i] != lastSeenOwid || lastSeenBucketProperty != bucketProperties[i]) {
-          compressedOwids[pos] =
-            new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, i);
-          lastSeenOwid = owids[i];
-          lastSeenBucketProperty = bucketProperties[i];
-          fromIndex = i;
-          ++pos;
-        }
+      if (lastCo != null) {
+        lastCo.toIndex = index; // Finalize the last record.
+        lastCo = null;
+      }
+      if (rowIds.length > index) {
+        rowIds = Arrays.copyOf(rowIds, index);
       }
-      // account for the last distinct owid
-      compressedOwids[pos] =
-        new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, owids.length);
+
+      this.compressedOwids = compressedOwids.toArray(new CompressedOwid[compressedOwids.size()]);
     }
 
+
     private boolean isDeleted(long owid, int bucketProperty, long rowId) {
       if (compressedOwids == null || rowIds == null) {
         return false;