You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/06/09 16:30:30 UTC

hive git commit: OrcRawRecordMerger doesn't work for more than one file in non vectorized case (Sergey Shelukhin, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/branch-3 1c5ce3f61 -> 7981d38cb


OrcRawRecordMerger doesn't work for more than one file in non vectorized case (Sergey Shelukhin, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7981d38c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7981d38c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7981d38c

Branch: refs/heads/branch-3
Commit: 7981d38cbf6ef43384f5fb51b1560d7cf5add729
Parents: 1c5ce3f
Author: sergey <se...@apache.org>
Authored: Sat Jun 9 09:10:15 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Jun 9 09:10:15 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/RecordIdentifier.java     |  1 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  6 ++--
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 38 ++++++++++++++------
 3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
index 607abfd..ea7ba53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
@@ -226,6 +226,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
     return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}";
   }
   protected String bucketToString() {
+    if (bucketId == -1) return ("bucket: " + bucketId);
     BucketCodec codec =
       BucketCodec.determineVersion(bucketId);
     return  "bucket: " + bucketId + "(" + codec.getVersion() + "." +

http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 694cf75..732e233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2076,9 +2076,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
     ValidWriteIdList validWriteIdList
             = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
-    LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString()
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString()
             + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN));
-
+      LOG.debug("Creating merger for {} and {}", split.getPath(), Arrays.toString(deltas));
+    }
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
             validWriteIdList, readOptions, deltas, mergerOptions);

http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 8c7c72e..9d954ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -249,6 +249,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       } while (nextRecord() != null &&
         (minKey != null && key.compareRow(getMinKey()) <= 0));
     }
+    @Override
+    public String toString() {
+      return "[key=" + key + ", nextRecord=" + nextRecord + ", reader=" + reader + "]";
+    }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
     }
@@ -281,7 +285,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             OrcRecordUpdater.getRowId(nextRecord()),
             OrcRecordUpdater.getCurrentTransaction(nextRecord()),
             OrcRecordUpdater.getOperation(nextRecord()) == OrcRecordUpdater.DELETE_OPERATION);
-
         // if this record is larger than maxKey, we need to stop
         if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) {
           LOG.debug("key " + getKey() + " > maxkey " + getMaxKey());
@@ -999,7 +1002,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
       ReaderPair pair = null;
-      ReaderKey key = new ReaderKey();
+      ReaderKey baseKey = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
         if(mergerOptions.isCompacting()) {
@@ -1009,7 +1012,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
               AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
           }
-          pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+          pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions,
             conf, validWriteIdList,
             0);//0 since base_x doesn't have a suffix (neither does pre acid write)
         } else {
@@ -1024,7 +1027,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
               tfp.syntheticWriteId, tfp.folder);
           }
-          pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
+          pair = new OriginalReaderPairToRead(baseKey, reader, bucket, keyInterval.getMinKey(),
             keyInterval.getMaxKey(), options,  readerPairOptions, conf, validWriteIdList, tfp.statementId);
         }
       } else {
@@ -1039,7 +1042,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             //doing major compaction - it's possible where full compliment of bucket files is not
             //required (on Tez) that base_x/ doesn't have a file for 'bucket'
             reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
-            pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+            pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
               eventOptions);
           }
           else {
@@ -1049,7 +1052,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         }
         else {
           assert reader != null : "no reader? " + mergerOptions.getRootPath();
-          pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+          pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
             eventOptions);
         }
       }
@@ -1058,7 +1061,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // if there is at least one record, put it in the map
       if (pair.nextRecord() != null) {
-        readers.put(key, pair);
+        ensurePutReader(baseKey, pair);
+        baseKey = null;
       }
       baseReader = pair.getRecordReader();
     }
@@ -1088,7 +1092,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ReaderPair deltaPair =  new OriginalReaderPairToCompact(key, bucket, options,
               rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
-            readers.put(key, deltaPair);
+            ensurePutReader(key, deltaPair);
+            key = new ReaderKey();
           }
           continue;
         }
@@ -1101,6 +1106,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              */
             continue;
           }
+          LOG.debug("Looking at delta file {}", deltaFile);
           if(deltaDir.isDeleteDelta()) {
             //if here it maybe compaction or regular read or Delete event sorter
             //in the later 2 cases we should do:
@@ -1109,7 +1115,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
                 deltaEventOptions);
             if (deltaPair.nextRecord() != null) {
-              readers.put(key, deltaPair);
+              ensurePutReader(key, deltaPair);
+              key = new ReaderKey();
             }
             continue;
           }
@@ -1123,13 +1130,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions);
           if (deltaPair.nextRecord() != null) {
-            readers.put(key, deltaPair);
+            ensurePutReader(key, deltaPair);
+            key = new ReaderKey();
           }
         }
       }
     }
 
     // get the first record
+    LOG.debug("Final reader map {}", readers);
     Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
     if (entry == null) {
       columns = 0;
@@ -1146,6 +1155,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
   }
 
+  private void ensurePutReader(ReaderKey key, ReaderPair deltaPair) throws IOException {
+    ReaderPair oldPair = readers.put(key, deltaPair);
+    if (oldPair == null) return;
+    String error = "Two readers for " + key + ": new " + deltaPair + ", old " + oldPair;
+    LOG.error(error);
+    throw new IOException(error);
+  }
+
   /**
    * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
    * type files into a base_x/ or delta_x_x.  The data in these are then assigned ROW_IDs at read
@@ -1352,6 +1369,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
       // if we are collapsing, figure out if this is a new row
       if (collapse || isSameRow) {
+        // Note: for collapse == false, this just sets keysSame.
         keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
         if (!keysSame) {
           prevKey.set(recordIdentifier);