You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/07/18 17:26:12 UTC

hive git commit: HIVE-17069 Refactor OrcRawRecrodMerger.ReaderPair (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master 9807a560c -> 0534b9048


HIVE-17069 Refactor OrcRawRecrodMerger.ReaderPair (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0534b904
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0534b904
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0534b904

Branch: refs/heads/master
Commit: 0534b9048e8d3de222bf41763f651f0946013c2d
Parents: 9807a56
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Jul 18 10:25:56 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Jul 18 10:25:56 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 594 ++++++++++---------
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  91 ++-
 2 files changed, 367 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0534b904/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 650f2af..814782a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -54,7 +53,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
   private static final Logger LOG = LoggerFactory.getLogger(OrcRawRecordMerger.class);
 
-  private final Configuration conf;
   private final boolean collapse;
   private final RecordReader baseReader;
   private final ObjectInspector objectInspector;
@@ -186,30 +184,37 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           currentTransactionId + ", statementId: "+ statementId + "}";
     }
   }
-
+  interface ReaderPair {
+    OrcStruct nextRecord();
+    int getColumns();
+    RecordReader getRecordReader();
+    Reader getReader();
+    RecordIdentifier getMinKey();
+    RecordIdentifier getMaxKey();
+    ReaderKey getKey();
+    void next(OrcStruct next) throws IOException;
+  }
   /**
    * A reader and the next record from that reader. The code reads ahead so that
    * we can return the lowest ReaderKey from each of the readers. Thus, the
    * next available row is nextRecord and only following records are still in
    * the reader.
    */
-  static class ReaderPair {
-    OrcStruct nextRecord;
-    final Reader reader;
-    final RecordReader recordReader;
-    final ReaderKey key;
+  @VisibleForTesting
+  final static class ReaderPairAcid implements ReaderPair {
+    private OrcStruct nextRecord;
+    private final Reader reader;
+    private final RecordReader recordReader;
+    private final ReaderKey key;
     private final RecordIdentifier minKey;
     private final RecordIdentifier maxKey;
-    final int bucket;
     private final int statementId;
-    boolean advancedToMinKey = false;
 
     /**
      * Create a reader that reads from the first key larger than minKey to any
      * keys equal to maxKey.
      * @param key the key to read into
      * @param reader the ORC file reader
-     * @param bucket the bucket number for the file
      * @param minKey only return keys larger than minKey if it is non-null
      * @param maxKey only return keys less than or equal to maxKey if it is
      *               non-null
@@ -217,70 +222,71 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * @param statementId id of SQL statement within a transaction
      * @throws IOException
      */
-    ReaderPair(ReaderKey key, Reader reader, int bucket,
-               RecordIdentifier minKey, RecordIdentifier maxKey,
-               ReaderImpl.Options options, int statementId) throws IOException {
+    @VisibleForTesting
+    ReaderPairAcid(ReaderKey key, Reader reader,
+                   RecordIdentifier minKey, RecordIdentifier maxKey,
+                   ReaderImpl.Options options, int statementId) throws IOException {
       this.reader = reader;
       this.key = key;
-      this.minKey = minKey;
-      this.maxKey = maxKey;
-      this.bucket = bucket;
       // TODO use stripe statistics to jump over stripes
       recordReader = reader.rowsOptions(options);
       this.statementId = statementId;
+      this.minKey = minKey;
+      this.maxKey = maxKey;
+      // advance the reader until we reach the minimum key
+      do {
+        next(nextRecord());
+      } while (nextRecord() != null &&
+        (minKey != null && key.compareRow(getMinKey()) <= 0));
+    }
+    @Override public final OrcStruct nextRecord() {
+      return nextRecord;
     }
-    RecordReader getRecordReader() {
+    @Override
+    public final int getColumns() {
+      return getReader().getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
+    }
+
+    @Override public RecordReader getRecordReader() {
       return recordReader;
     }
-    /**
-     * This must be called right after the constructor but not in the constructor to make sure
-     * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called
-     */
-    void advnaceToMinKey() throws IOException {
-      advancedToMinKey = true;
-      // advance the reader until we reach the minimum key
-      do {
-        next(nextRecord);
-      } while (nextRecord != null &&
-          (getMinKey() != null && key.compareRow(getMinKey()) <= 0));
+    @Override public Reader getReader() { return reader; }
+    @Override public RecordIdentifier getMinKey() {
+      return minKey;
+    }
+    @Override public RecordIdentifier getMaxKey() {
+      return maxKey;
+    }
+    @Override public ReaderKey getKey() {
+      return key;
     }
 
-    void next(OrcStruct next) throws IOException {
-      assert advancedToMinKey : "advnaceToMinKey() was not called";
+    @Override
+    public void next(OrcStruct next) throws IOException {
       if (getRecordReader().hasNext()) {
         nextRecord = (OrcStruct) getRecordReader().next(next);
         // set the key
-        key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
-            OrcRecordUpdater.getBucket(nextRecord),
-            OrcRecordUpdater.getRowId(nextRecord),
-            OrcRecordUpdater.getCurrentTransaction(nextRecord),
+        getKey().setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord()),
+            OrcRecordUpdater.getBucket(nextRecord()),
+            OrcRecordUpdater.getRowId(nextRecord()),
+            OrcRecordUpdater.getCurrentTransaction(nextRecord()),
             statementId);
 
         // if this record is larger than maxKey, we need to stop
-        if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
-          LOG.debug("key " + key + " > maxkey " + getMaxKey());
+        if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) {
+          LOG.debug("key " + getKey() + " > maxkey " + getMaxKey());
           nextRecord = null;
           getRecordReader().close();
         }
       } else {
         nextRecord = null;
-        recordReader.close();
+        getRecordReader().close();
       }
     }
-
-    RecordIdentifier getMinKey() {
-      return minKey;
-    }
-    RecordIdentifier getMaxKey() {
-      return maxKey;
-    }
-    int getColumns() {
-      return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
-    }
   }
 
   /**
-   * A reader that pretends an original base file is a new version base file.
+   * A reader that pretends an original base file is a new versioned base file.
    * It wraps the underlying reader's row with an ACID event object and
    * makes the relevant translations.
    * 
@@ -297,178 +303,38 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each
    * split of the original file and used to filter rows from all the deltas.  The ROW__ID.rowid for
    * the rows of the 'original' file of course, must be assigned from the beginning of logical
-   * bucket.
+   * bucket.  The last split of the logical bucket, i.e. the split that has the end of last file,
+   * should include all insert events from deltas.
    */
-  static final class OriginalReaderPair extends ReaderPair {
-    private final Options mergerOptions;
-    /**
-     * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list
-     */
-    private long rowIdOffset = 0;
-    /**
-     * See {@link AcidUtils.Directory#getOriginalFiles()}.  This list has a fixed sort order. This
-     * is the full list when compacting and empty when doing a simple read.  The later is because we
-     * only need to read the current split from 1 file for simple read.
-     */
-    private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
-    /**
-     * index into {@link #originalFiles}
-     */
-    private int nextFileIndex = 0;
-    private long numRowsInCurrentFile = 0;
-    private RecordReader originalFileRecordReader = null;
-    private final Configuration conf;
-    private final Reader.Options options;
-    private final RecordIdentifier minKey;//shadow parent minKey to make final
-    private final RecordIdentifier maxKey;//shadow parent maxKey to make final
-
-    OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
-                       final RecordIdentifier minKey, final RecordIdentifier maxKey,
-                       Reader.Options options, Options mergerOptions, Configuration conf,
-                       ValidTxnList validTxnList) throws IOException {
-      super(key, reader, bucket, minKey, maxKey, options, 0);
-      this.mergerOptions = mergerOptions;
-      this.conf = conf;
-      this.options = options;
-      assert mergerOptions.getRootPath() != null : "Since we have original files";
-      assert bucket >= 0 : "don't support non-bucketed tables yet";
+  private static abstract class OriginalReaderPair implements ReaderPair {
+    OrcStruct nextRecord;
+    private final ReaderKey key;
+    final int bucketId;
 
-      RecordIdentifier newMinKey = minKey;
-      RecordIdentifier newMaxKey = maxKey;
-      if(mergerOptions.isCompacting()) {
-        {
-          //when compacting each split needs to process the whole logical bucket
-          assert options.getOffset() == 0;
-          assert options.getMaxOffset() == Long.MAX_VALUE;
-          assert minKey == null;
-          assert maxKey == null;
-        }
-        AcidUtils.Directory directoryState = AcidUtils.getAcidState(
-          mergerOptions.getRootPath(), conf, validTxnList, false, true);
-        originalFiles = directoryState.getOriginalFiles();
-        assert originalFiles.size() > 0;
-        /**
-         * when there are no copyN files, the {@link #recordReader} will be the the one and only
-         * file for for 'bucket' but closing here makes flow cleaner and only happens once in the
-         * life of the table.  With copyN files, the caller may pass in any one of the copyN files.
-         * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file
-         * of a logical bucket.*/
-        recordReader.close();
-        reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
-        if(reader == null) {
-          //Compactor generated a split for a bucket that has no data?
-          throw new IllegalStateException("No 'original' files found for bucketId=" + bucket +
-            " in " + mergerOptions.getRootPath());
-        }
-        numRowsInCurrentFile = reader.getNumberOfRows();
-        originalFileRecordReader = reader.rowsOptions(options);
-      }
-      else {
-        /**
-         * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc  We don't
-         * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
-         * It's needed to correctly set maxKey.  In particular, set maxKey==null if this split
-         * is the tail of the last file for this logical bucket to include all deltas written after
-         * non-acid to acid table conversion.
-         */
-        boolean isLastFileForThisBucket = false;
-        boolean haveSeenCurrentFile = false;
-        originalFiles = Collections.emptyList();
-        if (mergerOptions.getCopyIndex() > 0) {
-          //the split is from something other than the 1st file of the logical bucket - compute offset
-          
-          AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
-            conf, validTxnList, false, true);
-          for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
-            AcidOutputFormat.Options bucketOptions =
-              AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucketId() != bucket) {
-              continue;
-            }
-            if(haveSeenCurrentFile) {
-              //if here we already saw current file and now found another file for the same bucket
-              //so the current file is not the last file of the logical bucket
-              isLastFileForThisBucket = false;
-              break;
-            }
-            if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
-              /**
-               * found the file whence the current split is from so we're done
-               * counting {@link rowIdOffset}
-               */
-              haveSeenCurrentFile = true;
-              isLastFileForThisBucket = true;
-              continue;
-            }
-            Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
-              OrcFile.readerOptions(conf));
-            rowIdOffset += copyReader.getNumberOfRows();
-          }
-          if (rowIdOffset > 0) {
-            //rowIdOffset could be 0 if all files before current one are empty
-            /**
-             * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
-             * int, Reader.Options)} need to fix min/max key since these are used by
-             * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
-             * the key.  Clear?  */
-            if (minKey != null) {
-              minKey.setRowId(minKey.getRowId() + rowIdOffset);
-            }
-            else {
-              /**
-               *  If this is not the 1st file, set minKey 1 less than the start of current file
-               * (Would not need to set minKey if we knew that there are no delta files)
-               * {@link #advanceToMinKey()} needs this */
-              newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1);
-            }
-            if (maxKey != null) {
-              maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
-            }
-          }
-        } else {
-          isLastFileForThisBucket = true;
-          AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
-            conf, validTxnList, false, true);
-          int numFilesInBucket= 0;
-          for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
-            AcidOutputFormat.Options bucketOptions =
-              AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucketId() == bucket) {
-              numFilesInBucket++;
-              if(numFilesInBucket > 1) {
-                isLastFileForThisBucket = false;
-                break;
-              }
-            }
-          }
-        }
-        originalFileRecordReader = recordReader;
-        if(!isLastFileForThisBucket && maxKey == null) {
-          /*
-           * If this is the last file for this bucket, maxKey == null means the split is the tail
-           * of the file so we want to leave it blank to make sure any insert events in delta
-           * files are included; Conversely, if it's not the last file, set the maxKey so that
-           * events from deltas that don't modify anything in the current split are excluded*/
-          newMaxKey = new RecordIdentifier(0, bucket,
-            rowIdOffset + reader.getNumberOfRows() - 1);
-        }
-      }
-      this.minKey = newMinKey;
-      this.maxKey = newMaxKey;
-    }
-    @Override RecordReader getRecordReader() {
-      return originalFileRecordReader;
+    OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+      this.key = key;
+      this.bucketId = bucketId;
+      assert bucketId >= 0 : "don't support non-bucketed tables yet";
     }
-    @Override RecordIdentifier getMinKey() {
-      return minKey;
+    @Override public final OrcStruct nextRecord() {
+      return nextRecord;
     }
-    @Override RecordIdentifier getMaxKey() {
-      return maxKey;
+    @Override
+    public int getColumns() {
+      return getReader().getTypes().get(0).getSubtypesCount();
     }
-    private boolean nextFromCurrentFile(OrcStruct next) throws IOException {
-      if (originalFileRecordReader.hasNext()) {
+    @Override
+    public final ReaderKey getKey() { return key; }
+    /**
+     * The cumulative number of row in all files of the logical bucket that precede the file
+     * represented by {@link #getRecordReader()}
+     */
+    abstract long getRowIdOffset();
+
+    final boolean nextFromCurrentFile(OrcStruct next) throws IOException {
+      if (getRecordReader().hasNext()) {
         //RecordReader.getRowNumber() produces a file-global row number even with PPD
-        long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset;
+        long nextRowId = getRecordReader().getRowNumber() + getRowIdOffset();
         // have to do initialization here, because the super's constructor
         // calls next and thus we need to initialize before our constructor
         // runs
@@ -476,17 +342,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS);
           IntWritable operation =
               new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
-          nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation);
-          nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+          nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
+          nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
               new LongWritable(0));
-          nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+          nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
               new LongWritable(0));
-          nextRecord.setFieldValue(OrcRecordUpdater.BUCKET,
-              new IntWritable(bucket));
-          nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
+          nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
+              new IntWritable(bucketId));
+          nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
               new LongWritable(nextRowId));
-          nextRecord.setFieldValue(OrcRecordUpdater.ROW,
-              originalFileRecordReader.next(null));
+          nextRecord().setFieldValue(OrcRecordUpdater.ROW,
+              getRecordReader().next(null));
         } else {
           nextRecord = next;
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
@@ -494,18 +360,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
               .set(0);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
-              .set(bucket);
+              .set(bucketId);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
               .set(0);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
               .set(nextRowId);
-          nextRecord.setFieldValue(OrcRecordUpdater.ROW,
-              originalFileRecordReader.next(OrcRecordUpdater.getRow(next)));
+          nextRecord().setFieldValue(OrcRecordUpdater.ROW,
+              getRecordReader().next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucket, nextRowId, 0L, 0);
-        if (maxKey != null && key.compareRow(maxKey) > 0) {
+        key.setValues(0L, bucketId, nextRowId, 0L, 0);
+        if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("key " + key + " > maxkey " + maxKey);
+            LOG.debug("key " + key + " > maxkey " + getMaxKey());
           }
           return false;//reached End Of Split
         }
@@ -513,9 +379,199 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
       return false;//reached EndOfFile
     }
+  }
+  @VisibleForTesting
+  final static class OriginalReaderPairToRead extends OriginalReaderPair {
+    private final long rowIdOffset;
+    private final Reader reader;
+    private final RecordReader recordReader;
+    private final RecordIdentifier minKey;
+    private final RecordIdentifier maxKey;
+
+    OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
+                             final RecordIdentifier minKey, final RecordIdentifier maxKey,
+                             Reader.Options options, Options mergerOptions, Configuration conf,
+                             ValidTxnList validTxnList) throws IOException {
+      super(key, bucketId);
+      this.reader = reader;
+      assert !mergerOptions.isCompacting();
+      assert mergerOptions.getRootPath() != null : "Since we have original files";
+
+      RecordIdentifier newMinKey = minKey;
+      RecordIdentifier newMaxKey = maxKey;
+      recordReader = reader.rowsOptions(options);
+      /**
+       * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc  We don't
+       * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+       * It's needed to correctly set maxKey.  In particular, set maxKey==null if this split
+       * is the tail of the last file for this logical bucket to include all deltas written after
+       * non-acid to acid table conversion.
+       */
+      boolean isLastFileForThisBucket = false;
+      boolean haveSeenCurrentFile = false;
+      long rowIdOffsetTmp = 0;
+      if (mergerOptions.getCopyIndex() > 0) {
+        //the split is from something other than the 1st file of the logical bucket - compute offset
+
+        AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+          conf, validTxnList, false, true);
+        for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+          AcidOutputFormat.Options bucketOptions =
+            AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+          if (bucketOptions.getBucketId() != bucketId) {
+            continue;
+          }
+          if (haveSeenCurrentFile) {
+            //if here we already saw current file and now found another file for the same bucket
+            //so the current file is not the last file of the logical bucket
+            isLastFileForThisBucket = false;
+            break;
+          }
+          if (f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
+            /**
+             * found the file whence the current split is from so we're done
+             * counting {@link rowIdOffset}
+             */
+            haveSeenCurrentFile = true;
+            isLastFileForThisBucket = true;
+            continue;
+          }
+          Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
+            OrcFile.readerOptions(conf));
+          rowIdOffsetTmp += copyReader.getNumberOfRows();
+        }
+        this.rowIdOffset = rowIdOffsetTmp;
+        if (rowIdOffset > 0) {
+          //rowIdOffset could be 0 if all files before current one are empty
+          /**
+           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
+           * int, Reader.Options)} need to fix min/max key since these are used by
+           * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
+           * the key.  Clear?  */
+          if (minKey != null) {
+            minKey.setRowId(minKey.getRowId() + rowIdOffset);
+          } else {
+            /**
+             *  If this is not the 1st file, set minKey 1 less than the start of current file
+             * (Would not need to set minKey if we knew that there are no delta files)
+             * {@link #advanceToMinKey()} needs this */
+            newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+          }
+          if (maxKey != null) {
+            maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
+          }
+        }
+      } else {
+        rowIdOffset = 0;
+        isLastFileForThisBucket = true;
+        AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+          conf, validTxnList, false, true);
+        int numFilesInBucket = 0;
+        for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+          AcidOutputFormat.Options bucketOptions =
+            AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+          if (bucketOptions.getBucketId() == bucketId) {
+            numFilesInBucket++;
+            if (numFilesInBucket > 1) {
+              isLastFileForThisBucket = false;
+              break;
+            }
+          }
+        }
+      }
+      if (!isLastFileForThisBucket && maxKey == null) {
+          /*
+           * If this is the last file for this bucket, maxKey == null means the split is the tail
+           * of the file so we want to leave it blank to make sure any insert events in delta
+           * files are included; Conversely, if it's not the last file, set the maxKey so that
+           * events from deltas that don't modify anything in the current split are excluded*/
+        newMaxKey = new RecordIdentifier(0, bucketId,
+          rowIdOffset + reader.getNumberOfRows() - 1);
+      }
+      this.minKey = newMinKey;
+      this.maxKey = newMaxKey;
+
+      // advance the reader until we reach the minimum key
+      do {
+        next(nextRecord());
+      } while (nextRecord() != null &&
+        (getMinKey() != null && this.getKey().compareRow(getMinKey()) <= 0));
+    }
+    @Override public RecordReader getRecordReader() {
+      return recordReader;
+    }
+    @Override public Reader getReader() { return reader; }
+    @Override public RecordIdentifier getMinKey() { return minKey; }
+    @Override public RecordIdentifier getMaxKey() {
+      return maxKey;
+    }
+    @Override public long getRowIdOffset() { return rowIdOffset; }
+
+    @Override
+    public void next(OrcStruct next) throws IOException {
+      if(!nextFromCurrentFile(next)) {
+        //only have 1 file so done
+        nextRecord = null;
+        getRecordReader().close();
+      }
+    }
+  }
+  @VisibleForTesting
+  final static class OriginalReaderPairToCompact extends OriginalReaderPair {
+    /**
+     * See {@link AcidUtils.Directory#getOriginalFiles()}.  This list has a fixed sort order.
+     * It includes all original files (for all buckets).  
+     */
+    private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
+    /**
+     * index into {@link #originalFiles}
+     */
+    private int nextFileIndex = 0;
+    private Reader reader;
+    private RecordReader recordReader = null;
+    private final Configuration conf;
+    private final Reader.Options options;
+    private long rowIdOffset = 0;
+
+    OriginalReaderPairToCompact(ReaderKey key, int bucketId,
+                       Reader.Options options, Options mergerOptions, Configuration conf,
+                       ValidTxnList validTxnList) throws IOException {
+      super(key, bucketId);
+      assert mergerOptions.isCompacting() : "Should only be used for Compaction";
+      this.conf = conf;
+      this.options = options;
+      assert mergerOptions.getRootPath() != null : "Since we have original files";
+      assert this.bucketId >= 0 : "don't support non-bucketed tables yet";
+      //when compacting each split needs to process the whole logical bucket
+      assert options.getOffset() == 0;
+      assert options.getMaxOffset() == Long.MAX_VALUE;
+      AcidUtils.Directory directoryState = AcidUtils.getAcidState(
+        mergerOptions.getRootPath(), conf, validTxnList, false, true);
+      originalFiles = directoryState.getOriginalFiles();
+      assert originalFiles.size() > 0;
+      this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+      if (reader == null) {
+        //Compactor generated a split for a bucket that has no data?
+        throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
+          " in " + mergerOptions.getRootPath());
+      }
+      recordReader = getReader().rowsOptions(options);
+      next(nextRecord());//load 1st row
+    }
+    @Override public RecordReader getRecordReader() {
+      return recordReader;
+    }
+    @Override public Reader getReader() { return reader; }
+    @Override public RecordIdentifier getMinKey() {
+      return null;
+    }
+    @Override public RecordIdentifier getMaxKey() {
+      return null;
+    }
+    @Override public long getRowIdOffset() { return rowIdOffset; }
+
     @Override
-    void next(OrcStruct next) throws IOException {
-      assert advancedToMinKey : "advnaceToMinKey() was not called";
+    public void next(OrcStruct next) throws IOException {
       while(true) {
         if(nextFromCurrentFile(next)) {
           return;
@@ -523,19 +579,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           if (originalFiles.size() <= nextFileIndex) {
             //no more original files to read
             nextRecord = null;
-            originalFileRecordReader.close();
+            recordReader.close();
             return;
           } else {
-            assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting";
-            rowIdOffset += numRowsInCurrentFile;
-            originalFileRecordReader.close();
-            Reader reader = advanceToNextFile();
+            rowIdOffset += reader.getNumberOfRows();
+            recordReader.close();
+            reader = advanceToNextFile();
             if(reader == null) {
               nextRecord = null;
               return;
             }
-            numRowsInCurrentFile = reader.getNumberOfRows();
-            originalFileRecordReader = reader.rowsOptions(options);
+            recordReader = reader.rowsOptions(options);
           }
         }
       }
@@ -546,21 +600,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      */
     private Reader advanceToNextFile() throws IOException {
       while(nextFileIndex < originalFiles.size()) {
-        AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
-        if (bucketOptions.getBucketId() == bucket) {
+        AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(
+          originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
+        if (bucketOptions.getBucketId() == bucketId) {
           break;
         }
+        //the the bucket we care about here
         nextFileIndex++;
       }
       if(originalFiles.size() <= nextFileIndex) {
         return null;//no more files for current bucket
       }
-      return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf));
-    }
-
-    @Override
-    int getColumns() {
-      return reader.getTypes().get(0).getSubtypesCount();
+      return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().
+        getPath(), OrcFile.readerOptions(conf));
     }
   }
 
@@ -569,8 +621,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * {@link ReaderKey} ascending.  The output of this Reader should a global order across these
    * files.  The root of this tree is always the next 'file' to read from.
    */
-  private final TreeMap<ReaderKey, ReaderPair> readers =
-      new TreeMap<ReaderKey, ReaderPair>();
+  private final TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>();
 
   // The reader that currently has the lowest key.
   private ReaderPair primary;
@@ -637,7 +688,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   }
 
   /**
-   * Find the key range for bucket files.
+   * Find the key range for the split (of the base).  These are used to filter delta files since
+   * both are sorted by key.
    * @param reader the reader
    * @param options the options for reading with
    * @throws IOException
@@ -681,7 +733,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    */
   static Reader.Options createEventOptions(Reader.Options options) {
     Reader.Options result = options.clone();
-    result.range(options.getOffset(), Long.MAX_VALUE);
+    //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
     result.include(options.getInclude());
 
     // slide the column names down by 6 for the name array
@@ -757,7 +809,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                      ValidTxnList validTxnList,
                      Reader.Options options,
                      Path[] deltaDirectory, Options mergerOptions) throws IOException {
-    this.conf = conf;
     this.collapse = collapseEvents;
     this.offset = options.getOffset();
     this.length = options.getLength();
@@ -788,18 +839,23 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       ReaderKey key = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
-        pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
-                                      options, mergerOptions, conf, validTxnList);
+        if(mergerOptions.isCompacting()) {
+          pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
+            conf, validTxnList);
+        }
+        else {
+          pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
+            keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+        }
       } else {
-        pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
-                              eventOptions, 0);
+        pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+          eventOptions, 0);
       }
       minKey = pair.getMinKey();
       maxKey = pair.getMaxKey();
       LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
-      pair.advnaceToMinKey();
       // if there is at least one record, put it in the map
-      if (pair.nextRecord != null) {
+      if (pair.nextRecord() != null) {
         readers.put(key, pair);
       }
       baseReader = pair.getRecordReader();
@@ -828,11 +884,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
               deltaEventOptions = eventOptions.clone().searchArgument(null, null);
             }
           }
-          ReaderPair deltaPair;
-          deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
-            maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
-          deltaPair.advnaceToMinKey();
-          if (deltaPair.nextRecord != null) {
+          ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+            deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+          if (deltaPair.nextRecord() != null) {
             readers.put(key, deltaPair);
           }
         }
@@ -883,8 +937,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     while (keysSame && primary != null) {
 
       // The primary's nextRecord is the next value to return
-      OrcStruct current = primary.nextRecord;
-      recordIdentifier.set(primary.key);
+      OrcStruct current = primary.nextRecord();
+      recordIdentifier.set(primary.getKey());
 
       // Advance the primary reader to the next record
       primary.next(extraValue);
@@ -895,12 +949,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
       // now that the primary reader has advanced, we need to see if we
       // continue to read it or move to the secondary.
-      if (primary.nextRecord == null ||
-          primary.key.compareTo(secondaryKey) > 0) {
+      if (primary.nextRecord() == null ||
+          primary.getKey().compareTo(secondaryKey) > 0) {
 
         // if the primary isn't done, push it back into the readers
-        if (primary.nextRecord != null) {
-          readers.put(primary.key, primary);
+        if (primary.nextRecord() != null) {
+          readers.put(primary.getKey(), primary);
         }
 
         // update primary and secondaryKey
@@ -967,10 +1021,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   @Override
   public void close() throws IOException {
     if (primary != null) {
-      primary.recordReader.close();
+      primary.getRecordReader().close();
     }
     for(ReaderPair pair: readers.values()) {
-      pair.recordReader.close();
+      pair.getRecordReader().close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0534b904/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 2406af5..ba8d675 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -186,25 +185,24 @@ public class TestOrcRawRecordMerger {
     Reader reader = createMockReader();
     RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
     RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
-    ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
+    ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, minKey, maxKey,
         new Reader.Options(), 0);
-    pair.advnaceToMinKey();
-    RecordReader recordReader = pair.recordReader;
+    RecordReader recordReader = pair.getRecordReader();
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketProperty());
     assertEquals(40, key.getRowId());
     assertEquals(120, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
+    assertEquals("third", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(40, key.getTransactionId());
     assertEquals(50, key.getBucketProperty());
     assertEquals(60, key.getRowId());
     assertEquals(130, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
+    assertEquals("fourth", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
+    pair.next(pair.nextRecord());
+    assertEquals(null, pair.nextRecord());
     Mockito.verify(recordReader).close();
   }
 
@@ -213,46 +211,45 @@ public class TestOrcRawRecordMerger {
     ReaderKey key = new ReaderKey();
     Reader reader = createMockReader();
 
-    ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
+    ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, null, null,
         new Reader.Options(), 0);
-    pair.advnaceToMinKey();
-    RecordReader recordReader = pair.recordReader;
+    RecordReader recordReader = pair.getRecordReader();
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketProperty());
     assertEquals(20, key.getRowId());
     assertEquals(100, key.getCurrentTransactionId());
-    assertEquals("first", value(pair.nextRecord));
+    assertEquals("first", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketProperty());
     assertEquals(30, key.getRowId());
     assertEquals(110, key.getCurrentTransactionId());
-    assertEquals("second", value(pair.nextRecord));
+    assertEquals("second", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketProperty());
     assertEquals(40, key.getRowId());
     assertEquals(120, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
+    assertEquals("third", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(40, key.getTransactionId());
     assertEquals(50, key.getBucketProperty());
     assertEquals(60, key.getRowId());
     assertEquals(130, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
+    assertEquals("fourth", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(40, key.getTransactionId());
     assertEquals(50, key.getBucketProperty());
     assertEquals(61, key.getRowId());
     assertEquals(140, key.getCurrentTransactionId());
-    assertEquals("fifth", value(pair.nextRecord));
+    assertEquals("fifth", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
+    pair.next(pair.nextRecord());
+    assertEquals(null, pair.nextRecord());
     Mockito.verify(recordReader).close();
   }
 
@@ -296,25 +293,24 @@ public class TestOrcRawRecordMerger {
     Path root = new Path(tmpDir, "testOriginalReaderPair");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey,
         new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
-    pair.advnaceToMinKey();
-    RecordReader recordReader = pair.recordReader;
+    RecordReader recordReader = pair.getRecordReader();
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
+    assertEquals("third", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
+    pair.next(pair.nextRecord());
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
+    assertEquals("fourth", value(pair.nextRecord()));
 
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
+    pair.next(pair.nextRecord());
+    assertEquals(null, pair.nextRecord());
     Mockito.verify(recordReader).close();
   }
 
@@ -331,46 +327,45 @@ public class TestOrcRawRecordMerger {
     Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null,
         new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
-    pair.advnaceToMinKey();
-    assertEquals("first", value(pair.nextRecord));
+    assertEquals("first", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(0, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
-    pair.next(pair.nextRecord);
-    assertEquals("second", value(pair.nextRecord));
+    pair.next(pair.nextRecord());
+    assertEquals("second", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(1, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
-    pair.next(pair.nextRecord);
-    assertEquals("third", value(pair.nextRecord));
+    pair.next(pair.nextRecord());
+    assertEquals("third", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
-    pair.next(pair.nextRecord);
-    assertEquals("fourth", value(pair.nextRecord));
+    pair.next(pair.nextRecord());
+    assertEquals("fourth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
-    pair.next(pair.nextRecord);
-    assertEquals("fifth", value(pair.nextRecord));
+    pair.next(pair.nextRecord());
+    assertEquals("fifth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketProperty());
     assertEquals(4, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
-    Mockito.verify(pair.recordReader).close();
+    pair.next(pair.nextRecord());
+    assertEquals(null, pair.nextRecord());
+    Mockito.verify(pair.getRecordReader()).close();
   }
 
   @Test
@@ -437,11 +432,11 @@ public class TestOrcRawRecordMerger {
     OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
         false, 10, createMaximalTxnList(),
         new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options());
-    RecordReader rr = merger.getCurrentReader().recordReader;
+    RecordReader rr = merger.getCurrentReader().getRecordReader();
     assertEquals(0, merger.getOtherReaders().size());
 
-    assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
-    assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
+    assertEquals("" + merger.getMinKey(),new RecordIdentifier(10, 20, 30), merger.getMinKey());
+    assertEquals("" + merger.getMaxKey(), new RecordIdentifier(40, 50, 60), merger.getMaxKey());
     RecordIdentifier id = merger.createKey();
     OrcStruct event = merger.createValue();