You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/07/11 04:51:43 UTC

[2/2] hive git commit: HIVE-16177 on Acid to acid conversion doesn't handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin)

HIVE-16177 on Acid to acid conversion doesn't handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/37f84ca0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/37f84ca0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/37f84ca0

Branch: refs/heads/master
Commit: 37f84ca096de15ee7a7a0779705e504fe9c96b4e
Parents: 6489352
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 10 21:50:56 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 10 21:51:20 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |   2 +-
 .../hadoop/hive/ql/io/AcidInputFormat.java      |   5 +-
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  21 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  58 ++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  63 +--
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 406 +++++++++++++++++--
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |   9 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |   9 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  54 ++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  62 +++
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  19 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  18 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  28 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   2 +-
 .../queries/clientpositive/insert_orig_table.q  |  20 +-
 .../clientpositive/insert_values_orig_table.q   |  20 +-
 .../insert_values_orig_table_use_metadata.q     |  23 +-
 .../clientpositive/insert_orig_table.q.out      |  68 +++-
 .../insert_values_orig_table.q.out              |  68 +++-
 .../insert_values_orig_table_use_metadata.q.out | 104 ++++-
 .../clientpositive/llap/insert_orig_table.q.out |  68 +++-
 22 files changed, 977 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a1cf76b..ba97d22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1205,7 +1205,7 @@ public final class Utilities {
    * Group 6: copy     [copy keyword]
    * Group 8: 2        [copy file index]
    */
-  private static final String COPY_KEYWORD = "_copy_"; // copy keyword
+  public static final String COPY_KEYWORD = "_copy_"; // copy keyword
   private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX =
       Pattern.compile("^.*?"+ // any prefix
                       "([0-9]+)"+ // taskId

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 7c7074d..25177ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,10 +112,13 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     private long minTxnId;
     private long maxTxnId;
     private List<Integer> stmtIds;
-    
+
     public DeltaMetaData() {
       this(0,0,new ArrayList<Integer>());
     }
+    /**
+     * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
+     */
     DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
       this.minTxnId = minTxnId;
       this.maxTxnId = maxTxnId;

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index b85b827..405cfde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.WritableComparable;
@@ -51,6 +52,11 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private long minimumTransactionId;
     private long maximumTransactionId;
     private int bucket;
+    /**
+     * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)}
+     * _copy_N starts with 1.
+     */
+    private int copyNumber = 0;
     private PrintStream dummyStream = null;
     private boolean oldStyle = false;
     private int recIdCol = -1;  // Column the record identifier is in, -1 indicates no record id
@@ -180,6 +186,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     }
 
     /**
+     * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket
+     * file.
+     * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD
+     * @param copyNumber the number of the copy ( > 0)
+     * @return this
+     */
+    public Options copyNumber(int copyNumber) {
+      this.copyNumber = copyNumber;
+      return this;
+    }
+
+    /**
      * Whether it should use the old style (0000000_0) filenames.
      * @param value should use the old style names
      * @return this
@@ -293,6 +311,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     public int getStatementId() {
       return statementId;
     }
+    public int getCopyNumber() {
+      return copyNumber;
+    }
     public Path getFinalDestination() {
       return finalDestination;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index da00bb3..1c03736 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -48,6 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
+
 /**
  * Utilities that are shared by all of the ACID input and output formats. They
  * are used by the compactor and cleaner and thus must be format agnostic.
@@ -99,6 +102,10 @@ public class AcidUtils {
   public static final int MAX_STATEMENTS_PER_TXN = 10000;
   public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
   public static final Pattern   LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+  /**
+   * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
+   * a "delta" dir written by a real Acid write - cannot have any copies
+   */
   public static final PathFilter originalBucketFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -113,6 +120,11 @@ public class AcidUtils {
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
+  /**
+   * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD
+   */
+  private static final Pattern ORIGINAL_PATTERN_COPY =
+    Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+");
 
   public static final PathFilter hiddenFileFilter = new PathFilter(){
     @Override
@@ -243,7 +255,21 @@ public class AcidUtils {
           .maximumTransactionId(0)
           .bucket(bucket)
           .writingBase(true);
-    } else if (filename.startsWith(BUCKET_PREFIX)) {
+    }
+    else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
+      //todo: define groups in regex and use parseInt(Matcher.group(2))....
+      int bucket =
+        Integer.parseInt(filename.substring(0, filename.indexOf('_')));
+      int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1));
+      result
+        .setOldStyle(true)
+        .minimumTransactionId(0)
+        .maximumTransactionId(0)
+        .bucket(bucket)
+        .copyNumber(copyNumber)
+        .writingBase(true);
+    }
+    else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
           Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
       if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
@@ -482,7 +508,7 @@ public class AcidUtils {
     Path getBaseDirectory();
 
     /**
-     * Get the list of original files.  Not {@code null}.
+     * Get the list of original files.  Not {@code null}.  Must be sorted.
      * @return the list of original files (eg. 000000_0)
      */
     List<HdfsFileStatusWithId> getOriginalFiles();
@@ -825,7 +851,7 @@ public class AcidUtils {
       // Okay, we're going to need these originals.  Recurse through them and figure out what we
       // really need.
       for (FileStatus origDir : originalDirectories) {
-        findOriginals(fs, origDir, original, useFileIds);
+        findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles);
       }
     }
 
@@ -893,7 +919,17 @@ public class AcidUtils {
     final Path base = bestBase.status == null ? null : bestBase.status.getPath();
     LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
         deltas.size());
-
+    /**
+     * If this sort order is changed and there are tables that have been converted to transactional
+     * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it
+     * may result in data loss since it may change how
+     * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns 
+     * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen).
+     */
+    Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> {
+      //this does "Path.uri.compareTo(that.uri)"
+      return o1.getFileStatus().compareTo(o2.getFileStatus());
+    });
     return new Directory(){
 
       @Override
@@ -1011,7 +1047,7 @@ public class AcidUtils {
    * @throws IOException
    */
   private static void findOriginals(FileSystem fs, FileStatus stat,
-      List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds) throws IOException {
+      List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
     assert stat.isDir();
     List<HdfsFileStatusWithId> childrenWithId = null;
     Boolean val = useFileIds.value;
@@ -1031,18 +1067,22 @@ public class AcidUtils {
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         if (child.getFileStatus().isDir()) {
-          findOriginals(fs, child.getFileStatus(), original, useFileIds);
+          findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles);
         } else {
-          original.add(child);
+          if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) {
+            original.add(child);
+          }
         }
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter);
       for (FileStatus child : children) {
         if (child.isDir()) {
-          findOriginals(fs, child, original, useFileIds);
+          findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles);
         } else {
-          original.add(createOriginalObj(null, child));
+          if(!ignoreEmptyFiles || child.getLen() > 0) {
+            original.add(createOriginalObj(null, child));
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0ef7c75..f9e17a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1677,7 +1677,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         // independent split strategies for them. There is a global flag 'isOriginal' that is set
         // on a per split strategy basis and it has to be same for all the files in that strategy.
         List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
-            adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+            adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
             allowSyntheticFileIds);
 
         for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1927,7 +1927,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       } else {
         root = path.getParent().getParent();
       }
-    } else {
+    } else {//here path is a delta/ but above it's a partition/
       root = path;
     }
 
@@ -1947,7 +1947,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     final Configuration conf = options.getConfiguration();
 
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
-    final int bucket = OrcInputFormat.getBucketForSplit(conf, split);
+    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+    mergerOptions.rootPath(root);
+    final int bucket;
+    if (split.hasBase()) {
+      AcidOutputFormat.Options acidIOOptions =
+        AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf);
+      if(acidIOOptions.getBucket() < 0) {
+        LOG.warn("Can't determine bucket ID for " + split.getPath() + "; ignoring");
+      }
+      bucket = acidIOOptions.getBucket();
+      if(split.isOriginal()) {
+        mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath());
+      }
+    } else {
+      bucket = (int) split.getStart();
+    }
+
     final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
     readOptions.range(split.getStart(), split.getLength());
 
@@ -1956,7 +1972,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       new ValidReadTxnList(txnString);
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
-            validTxnList, readOptions, deltas);
+            validTxnList, readOptions, deltas, mergerOptions);
     return new RowReader<OrcStruct>() {
       OrcStruct innerRecord = records.createValue();
 
@@ -2008,20 +2024,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     };
   }
-
-  static Path findOriginalBucket(FileSystem fs,
+  private static Path findOriginalBucket(FileSystem fs,
                                  Path directory,
                                  int bucket) throws IOException {
     for(FileStatus stat: fs.listStatus(directory)) {
-      String name = stat.getPath().getName();
-      String numberPart = name.substring(0, name.indexOf('_'));
-      if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) &&
-          Integer.parseInt(numberPart) == bucket) {
+      if(stat.getLen() <= 0) {
+        continue;
+      }
+      AcidOutputFormat.Options bucketInfo =
+        AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
+      if(bucketInfo.getBucket() == bucket) {
         return stat.getPath();
       }
     }
-    throw new IllegalArgumentException("Can't find bucket " + bucket + " in " +
-        directory);
+    throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
   }
 
   static Reader.Options createOptionsForReader(Configuration conf) {
@@ -2054,14 +2070,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return reader;
   }
 
-  static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) {
-    if (orcSplit.hasBase()) {
-      return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
-    } else {
-      return (int) orcSplit.getStart();
-    }
-  }
-
   public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
       OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
       List<StripeStatistics> stripeStats, int stripeCount) {
@@ -2134,7 +2142,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
   @VisibleForTesting
   static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx combinedCtx, Context context,
-      FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+      FileSystem fs, Path dir,
       List<AcidBaseFileInfo> baseFiles,
       List<ParsedDelta> parsedDeltas,
       List<OrcProto.Type> readerTypes,
@@ -2145,7 +2153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     // When no baseFiles, we will just generate a single split strategy and return.
     List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
     if (baseFiles.isEmpty()) {
-      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
           acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
       if (splitStrategy != null) {
         splitStrategies.add(splitStrategy);
@@ -2165,7 +2173,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     // Generate split strategy for non-acid schema original files, if any.
     if (!originalSchemaFiles.isEmpty()) {
-      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
           originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
       if (splitStrategy != null) {
         splitStrategies.add(splitStrategy);
@@ -2174,7 +2182,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     // Generate split strategy for acid schema files, if any.
     if (!acidSchemaFiles.isEmpty()) {
-      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
           acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
       if (splitStrategy != null) {
         splitStrategies.add(splitStrategy);
@@ -2186,7 +2194,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
   @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
-      FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+      FileSystem fs, Path dir,
       List<HdfsFileStatusWithId> baseFiles,
       boolean isOriginal,
       List<ParsedDelta> parsedDeltas,
@@ -2261,8 +2269,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
       reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
     }
+    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
+      .isCompacting(true)
+      .rootPath(baseDirectory);
     return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
-        bucket, validTxnList, new Reader.Options(), deltaDirectory);
+        bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95b8806..ffcdf6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,11 +18,14 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
@@ -59,11 +62,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final long length;
   private final ValidTxnList validTxnList;
   private final int columns;
-  private ReaderKey prevKey = new ReaderKey();
+  private final ReaderKey prevKey = new ReaderKey();
   // this is the key less than the lowest key we need to process
-  private RecordIdentifier minKey;
+  private final RecordIdentifier minKey;
   // this is the last key we need to process
-  private RecordIdentifier maxKey;
+  private final RecordIdentifier maxKey;
   // an extra value so that we can return it while reading ahead
   private OrcStruct extraValue;
 
@@ -156,7 +159,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
     }
 
-    public long getCurrentTransactionId() {
+    long getCurrentTransactionId() {
       return currentTransactionId;
     }
 
@@ -165,7 +168,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * @param other the value to compare to
      * @return -1, 0, +1
      */
-    public int compareRow(RecordIdentifier other) {
+    int compareRow(RecordIdentifier other) {
       return compareToInternal(other);
     }
 
@@ -188,9 +191,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     final Reader reader;
     final RecordReader recordReader;
     final ReaderKey key;
-    final RecordIdentifier maxKey;
+    private final RecordIdentifier minKey;
+    private final RecordIdentifier maxKey;
     final int bucket;
     private final int statementId;
+    boolean advancedToMinKey = false;
 
     /**
      * Create a reader that reads from the first key larger than minKey to any
@@ -210,21 +215,33 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                ReaderImpl.Options options, int statementId) throws IOException {
       this.reader = reader;
       this.key = key;
+      this.minKey = minKey;
       this.maxKey = maxKey;
       this.bucket = bucket;
       // TODO use stripe statistics to jump over stripes
       recordReader = reader.rowsOptions(options);
       this.statementId = statementId;
+    }
+    RecordReader getRecordReader() {
+      return recordReader;
+    }
+    /**
+     * This must be called right after the constructor but not in the constructor to make sure
+     * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called
+     */
+    void advnaceToMinKey() throws IOException {
+      advancedToMinKey = true;
       // advance the reader until we reach the minimum key
       do {
         next(nextRecord);
       } while (nextRecord != null &&
-          (minKey != null && key.compareRow(minKey) <= 0));
+          (getMinKey() != null && key.compareRow(getMinKey()) <= 0));
     }
 
     void next(OrcStruct next) throws IOException {
-      if (recordReader.hasNext()) {
-        nextRecord = (OrcStruct) recordReader.next(next);
+      assert advancedToMinKey : "advnaceToMinKey() was not called";
+      if (getRecordReader().hasNext()) {
+        nextRecord = (OrcStruct) getRecordReader().next(next);
         // set the key
         key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
             OrcRecordUpdater.getBucket(nextRecord),
@@ -233,10 +250,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             statementId);
 
         // if this record is larger than maxKey, we need to stop
-        if (maxKey != null && key.compareRow(maxKey) > 0) {
-          LOG.debug("key " + key + " > maxkey " + maxKey);
+        if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
+          LOG.debug("key " + key + " > maxkey " + getMaxKey());
           nextRecord = null;
-          recordReader.close();
+          getRecordReader().close();
         }
       } else {
         nextRecord = null;
@@ -244,6 +261,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
     }
 
+    RecordIdentifier getMinKey() {
+      return minKey;
+    }
+    RecordIdentifier getMaxKey() {
+      return maxKey;
+    }
     int getColumns() {
       return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
     }
@@ -253,18 +276,192 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * A reader that pretends an original base file is a new version base file.
    * It wraps the underlying reader's row with an ACID event object and
    * makes the relevant translations.
+   * 
+   * Running multiple Insert statements on the same partition (of non acid table) creates files
+   * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc.  So the OriginalReaderPair must treat all
+   * of these files as part of a single logical bucket file.
+   * 
+   * For Compaction, where each split includes the whole bucket, this means reading over all the
+   * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+   *
+   * For a read after the table is marked transactional but before it's rewritten into a base/
+   * by compaction, each of the original files may be split into many pieces.  For each split we
+   * must make sure to include only the relevant part of each delta file.
+   * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each
+   * split of the original file and used to filter rows from all the deltas.  The ROW__ID.rowid for
+   * the rows of the 'original' file of course, must be assigned from the beginning of logical
+   * bucket.
    */
   static final class OriginalReaderPair extends ReaderPair {
+    private final Options mergerOptions;
+    /**
+     * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list
+     */
+    private long rowIdOffset = 0;
+    /**
+     * See {@link AcidUtils.Directory#getOriginalFiles()}.  This list has a fixed sort order. This
+     * is the full list when compacting and empty when doing a simple read.  The later is because we
+     * only need to read the current split from 1 file for simple read.
+     */
+    private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
+    /**
+     * index into {@link #originalFiles}
+     */
+    private int nextFileIndex = 0;
+    private long numRowsInCurrentFile = 0;
+    private RecordReader originalFileRecordReader = null;
+    private final Configuration conf;
+    private final Reader.Options options;
+    private final RecordIdentifier minKey;//shadow parent minKey to make final
+    private final RecordIdentifier maxKey;//shadow parent maxKey to make final
+
     OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
-                       RecordIdentifier minKey, RecordIdentifier maxKey,
-                       Reader.Options options) throws IOException {
+                       final RecordIdentifier minKey, final RecordIdentifier maxKey,
+                       Reader.Options options, Options mergerOptions, Configuration conf,
+                       ValidTxnList validTxnList) throws IOException {
       super(key, reader, bucket, minKey, maxKey, options, 0);
+      this.mergerOptions = mergerOptions;
+      this.conf = conf;
+      this.options = options;
+      assert mergerOptions.getRootPath() != null : "Since we have original files";
+      assert bucket >= 0 : "don't support non-bucketed tables yet";
+
+      RecordIdentifier newMinKey = minKey;
+      RecordIdentifier newMaxKey = maxKey;
+      if(mergerOptions.isCompacting()) {
+        {
+          //when compacting each split needs to process the whole logical bucket
+          assert options.getOffset() == 0;
+          assert options.getMaxOffset() == Long.MAX_VALUE;
+          assert minKey == null;
+          assert maxKey == null;
+        }
+        AcidUtils.Directory directoryState = AcidUtils.getAcidState(
+          mergerOptions.getRootPath(), conf, validTxnList, false, true);
+        originalFiles = directoryState.getOriginalFiles();
+        assert originalFiles.size() > 0;
+        /**
+         * when there are no copyN files, the {@link #recordReader} will be the the one and only
+         * file for for 'bucket' but closing here makes flow cleaner and only happens once in the
+         * life of the table.  With copyN files, the caller may pass in any one of the copyN files.
+         * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file
+         * of a logical bucket.*/
+        recordReader.close();
+        reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+        if(reader == null) {
+          //Compactor generated a split for a bucket that has no data?
+          throw new IllegalStateException("No 'original' files found for bucketId=" + bucket +
+            " in " + mergerOptions.getRootPath());
+        }
+        numRowsInCurrentFile = reader.getNumberOfRows();
+        originalFileRecordReader = reader.rowsOptions(options);
+      }
+      else {
+        /**
+         * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc  We don't
+         * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+         * It's needed to correctly set maxKey.  In particular, set maxKey==null if this split
+         * is the tail of the last file for this logical bucket to include all deltas written after
+         * non-acid to acid table conversion.
+         */
+        boolean isLastFileForThisBucket = false;
+        boolean haveSeenCurrentFile = false;
+        originalFiles = Collections.emptyList();
+        if (mergerOptions.getCopyIndex() > 0) {
+          //the split is from something other than the 1st file of the logical bucket - compute offset
+          
+          AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+            conf, validTxnList, false, true);
+          for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+            AcidOutputFormat.Options bucketOptions =
+              AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+            if (bucketOptions.getBucket() != bucket) {
+              continue;
+            }
+            if(haveSeenCurrentFile) {
+              //if here we already saw current file and now found another file for the same bucket
+              //so the current file is not the last file of the logical bucket
+              isLastFileForThisBucket = false;
+              break;
+            }
+            if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
+              /**
+               * found the file whence the current split is from so we're done
+               * counting {@link rowIdOffset}
+               */
+              haveSeenCurrentFile = true;
+              isLastFileForThisBucket = true;
+              continue;
+            }
+            Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
+              OrcFile.readerOptions(conf));
+            rowIdOffset += copyReader.getNumberOfRows();
+          }
+          if (rowIdOffset > 0) {
+            //rowIdOffset could be 0 if all files before current one are empty
+            /**
+             * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
+             * int, Reader.Options)} need to fix min/max key since these are used by
+             * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
+             * the key.  Clear?  */
+            if (minKey != null) {
+              minKey.setRowId(minKey.getRowId() + rowIdOffset);
+            }
+            else {
+              /**
+               *  If this is not the 1st file, set minKey 1 less than the start of current file
+               * (Would not need to set minKey if we knew that there are no delta files)
+               * {@link #advanceToMinKey()} needs this */
+              newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1);
+            }
+            if (maxKey != null) {
+              maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
+            }
+          }
+        } else {
+          isLastFileForThisBucket = true;
+          AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+            conf, validTxnList, false, true);
+          int numFilesInBucket= 0;
+          for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+            AcidOutputFormat.Options bucketOptions =
+              AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+            if (bucketOptions.getBucket() == bucket) {
+              numFilesInBucket++;
+              if(numFilesInBucket > 1) {
+                isLastFileForThisBucket = false;
+                break;
+              }
+            }
+          }
+        }
+        originalFileRecordReader = recordReader;
+        if(!isLastFileForThisBucket && maxKey == null) {
+          /*
+           * If this is the last file for this bucket, maxKey == null means the split is the tail
+           * of the file so we want to leave it blank to make sure any insert events in delta
+           * files are included; Conversely, if it's not the last file, set the maxKey so that
+           * events from deltas that don't modify anything in the current split are excluded*/
+          newMaxKey = new RecordIdentifier(0, bucket,
+            rowIdOffset + reader.getNumberOfRows() - 1);
+        }
+      }
+      this.minKey = newMinKey;
+      this.maxKey = newMaxKey;
     }
-
-    @Override
-    void next(OrcStruct next) throws IOException {
-      if (recordReader.hasNext()) {
-        long nextRowId = recordReader.getRowNumber();
+    @Override RecordReader getRecordReader() {
+      return originalFileRecordReader;
+    }
+    @Override RecordIdentifier getMinKey() {
+      return minKey;
+    }
+    @Override RecordIdentifier getMaxKey() {
+      return maxKey;
+    }
+    private boolean nextFromCurrentFile(OrcStruct next) throws IOException {
+      if (originalFileRecordReader.hasNext()) {
+        //RecordReader.getRowNumber() produces a file-global row number even with PPD
+        long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset;
         // have to do initialization here, because the super's constructor
         // calls next and thus we need to initialize before our constructor
         // runs
@@ -282,7 +479,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
               new LongWritable(nextRowId));
           nextRecord.setFieldValue(OrcRecordUpdater.ROW,
-              recordReader.next(null));
+              originalFileRecordReader.next(null));
         } else {
           nextRecord = next;
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
@@ -296,20 +493,62 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
               .set(nextRowId);
           nextRecord.setFieldValue(OrcRecordUpdater.ROW,
-              recordReader.next(OrcRecordUpdater.getRow(next)));
+              originalFileRecordReader.next(OrcRecordUpdater.getRow(next)));
         }
         key.setValues(0L, bucket, nextRowId, 0L, 0);
         if (maxKey != null && key.compareRow(maxKey) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + maxKey);
           }
-          nextRecord = null;
-          recordReader.close();
+          return false;//reached End Of Split
         }
-      } else {
-        nextRecord = null;
-        recordReader.close();
+        return true;
       }
+      return false;//reached EndOfFile
+    }
+    @Override
+    void next(OrcStruct next) throws IOException {
+      assert advancedToMinKey : "advnaceToMinKey() was not called";
+      while(true) {
+        if(nextFromCurrentFile(next)) {
+          return;
+        } else {
+          if (originalFiles.size() <= nextFileIndex) {
+            //no more original files to read
+            nextRecord = null;
+            originalFileRecordReader.close();
+            return;
+          } else {
+            assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting";
+            rowIdOffset += numRowsInCurrentFile;
+            originalFileRecordReader.close();
+            Reader reader = advanceToNextFile();
+            if(reader == null) {
+              nextRecord = null;
+              return;
+            }
+            numRowsInCurrentFile = reader.getNumberOfRows();
+            originalFileRecordReader = reader.rowsOptions(options);
+          }
+        }
+      }
+    }
+    /**
+     * Finds the next file of the logical bucket
+     * @return {@code null} if there are no more files
+     */
+    private Reader advanceToNextFile() throws IOException {
+      while(nextFileIndex < originalFiles.size()) {
+        AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
+        if (bucketOptions.getBucket() == bucket) {
+          break;
+        }
+        nextFileIndex++;
+      }
+      if(originalFiles.size() <= nextFileIndex) {
+        return null;//no more files for current bucket
+      }
+      return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf));
     }
 
     @Override
@@ -318,6 +557,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
   }
 
+  /**
+   * The process here reads several (base + some deltas) files each of which is sorted on 
+   * {@link ReaderKey} ascending.  The output of this Reader should a global order across these
+   * files.  The root of this tree is always the next 'file' to read from.
+   */
   private final TreeMap<ReaderKey, ReaderPair> readers =
       new TreeMap<ReaderKey, ReaderPair>();
 
@@ -327,6 +571,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   // The key of the next lowest reader.
   private ReaderKey secondaryKey = null;
 
+  private static final class KeyInterval {
+    private final RecordIdentifier minKey;
+    private final RecordIdentifier maxKey;
+    private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) {
+      this.minKey = minKey;
+      this.maxKey = maxKey;
+    }
+    private RecordIdentifier getMinKey() {
+      return minKey;
+    }
+    private RecordIdentifier getMaxKey() {
+      return maxKey;
+    }
+  }
   /**
    * Find the key range for original bucket files.
    * @param reader the reader
@@ -334,14 +592,24 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * @param options the options for reading with
    * @throws IOException
    */
-  private void discoverOriginalKeyBounds(Reader reader, int bucket,
+  private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
                                          Reader.Options options
                                          ) throws IOException {
     long rowLength = 0;
     long rowOffset = 0;
-    long offset = options.getOffset();
-    long maxOffset = options.getMaxOffset();
+    long offset = options.getOffset();//this would usually be at block boundary
+    long maxOffset = options.getMaxOffset();//this would usually be at block boundary
     boolean isTail = true;
+    RecordIdentifier minKey = null;
+    RecordIdentifier maxKey = null;
+   /**
+    * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
+    * necessarily match stripe boundary.  So we want to come up with minKey to be one before the 1st
+    * row of the first stripe that starts after getOffset() and maxKey to be the last row of the
+    * stripe that contains getMaxOffset().  This breaks if getOffset() and getMaxOffset() are inside
+    * the sames tripe - in this case we have minKey & isTail=false but rowLength is never set.
+    * (HIVE-16953)
+    */
     for(StripeInformation stripe: reader.getStripes()) {
       if (offset > stripe.getOffset()) {
         rowOffset += stripe.getNumberOfRows();
@@ -358,6 +626,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     if (!isTail) {
       maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
     }
+    return new KeyInterval(minKey, maxKey);
   }
 
   /**
@@ -366,7 +635,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * @param options the options for reading with
    * @throws IOException
    */
-  private void discoverKeyBounds(Reader reader,
+  private KeyInterval discoverKeyBounds(Reader reader,
                                  Reader.Options options) throws IOException {
     RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
     long offset = options.getOffset();
@@ -374,6 +643,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     int firstStripe = 0;
     int stripeCount = 0;
     boolean isTail = true;
+    RecordIdentifier minKey = null;
+    RecordIdentifier maxKey = null;
+    
     List<StripeInformation> stripes = reader.getStripes();
     for(StripeInformation stripe: stripes) {
       if (offset > stripe.getOffset()) {
@@ -391,6 +663,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     if (!isTail) {
       maxKey = keyIndex[firstStripe + stripeCount - 1];
     }
+    return new KeyInterval(minKey, maxKey);
   }
 
   /**
@@ -416,6 +689,49 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     return result;
   }
 
+  static class Options {
+    private int copyIndex = 0;
+    private boolean isCompacting = false;
+    private Path bucketPath;
+    private Path rootPath;
+    Options copyIndex(int copyIndex) {
+      assert copyIndex >= 0;
+      this.copyIndex = copyIndex;
+      return this;
+    }
+    Options isCompacting(boolean isCompacting) {
+      this.isCompacting = isCompacting;
+      return this;
+    }
+    Options bucketPath(Path bucketPath) {
+      this.bucketPath = bucketPath;
+      return this;
+    }
+    Options rootPath(Path rootPath) {
+      this.rootPath = rootPath;
+      return this;
+    }
+    /**
+     * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
+     */
+    int getCopyIndex() {
+      return copyIndex;
+    }
+    boolean isCompacting() {
+      return isCompacting;
+    }
+    /**
+     * Full path to the data file
+     * @return
+     */
+    Path getBucketPath() {
+      return bucketPath;
+    }
+    /**
+     * Partition folder (Table folder if not partitioned)
+     */
+    Path getRootPath()  { return rootPath; }
+  }
   /**
    * Create a reader that merge sorts the ACID events together.
    * @param conf the configuration
@@ -433,7 +749,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                      int bucket,
                      ValidTxnList validTxnList,
                      Reader.Options options,
-                     Path[] deltaDirectory) throws IOException {
+                     Path[] deltaDirectory, Options mergerOptions) throws IOException {
     this.conf = conf;
     this.collapse = collapseEvents;
     this.offset = options.getOffset();
@@ -450,32 +766,36 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     Reader.Options eventOptions = createEventOptions(options);
     if (reader == null) {
       baseReader = null;
+      minKey = maxKey = null;
     } else {
-
-      // find the min/max based on the offset and length
+      KeyInterval keyInterval;
+      // find the min/max based on the offset and length (and more for 'original')
       if (isOriginal) {
-        discoverOriginalKeyBounds(reader, bucket, options);
+        keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
       } else {
-        discoverKeyBounds(reader, options);
+        keyInterval = discoverKeyBounds(reader, options);
       }
-      LOG.info("min key = " + minKey + ", max key = " + maxKey);
+      LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
       ReaderPair pair;
       ReaderKey key = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
-        pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
-                                      options);
+        pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+                                      options, mergerOptions, conf, validTxnList);
       } else {
-        pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
+        pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
                               eventOptions, 0);
       }
-
+      minKey = pair.getMinKey();
+      maxKey = pair.getMaxKey();
+      LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
+      pair.advnaceToMinKey();
       // if there is at least one record, put it in the map
       if (pair.nextRecord != null) {
         readers.put(key, pair);
       }
-      baseReader = pair.recordReader;
+      baseReader = pair.getRecordReader();
     }
 
     // we always want to read all of the deltas
@@ -504,6 +824,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ReaderPair deltaPair;
           deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
             maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+          deltaPair.advnaceToMinKey();
           if (deltaPair.nextRecord != null) {
             readers.put(key, deltaPair);
           }
@@ -648,6 +969,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
   @Override
   public float getProgress() throws IOException {
+    //this is not likely to do the right thing for Compaction of "original" files when there are copyN files
     return baseReader == null ? 1 : baseReader.getProgress();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 75c7680..29f5a8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
@@ -42,9 +41,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.AcidStats;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
@@ -360,8 +356,11 @@ public class VectorizedOrcAcidRowBatchReader
           int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
           String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
           this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+          OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+          assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
           this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
-                                                      validTxnList, readerOptions, deleteDeltas);
+                                                      validTxnList, readerOptions, deleteDeltas,
+                                                      mergerOptions);
           this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
           this.deleteRecordValue = this.deleteRecords.createValue();
           // Initialize the first value in the delete reader.

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 73710a7..7019f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3129,7 +3129,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     * I'll leave the below loop for now until a better approach is found.
     */
     for (int counter = 1; destFs.exists(destFilePath); counter++) {
-      destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
+      destFilePath =  new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : ""));
     }
 
     if (isRenameAllowed) {

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index f83b6db..bafed9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -251,6 +251,7 @@ public class CompactorMR {
           // There are original format files
           for (HdfsFileStatusWithId stat : originalFiles) {
             Path path = stat.getFileStatus().getPath();
+            //note that originalFiles are all original files recursively not dirs
             dirsToSearch.add(path);
             LOG.debug("Adding original file " + path + " to dirs to search");
           }
@@ -275,6 +276,12 @@ public class CompactorMR {
 
     su.gatherStats();
   }
+
+  /**
+   * @param baseDir if not null, it's either table/partition root folder or base_xxxx.  
+   *                If it's base_xxxx, it's in dirsToSearch, else the actual original files
+   *                (all leaves recursively) are in the dirsToSearch list
+   */
   private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
@@ -363,7 +370,7 @@ public class CompactorMR {
      * @param hadoopConf
      * @param bucket bucket to be processed by this split
      * @param files actual files this split should process.  It is assumed the caller has already
-     *              parsed out the files in base and deltas to populate this list.
+     *              parsed out the files in base and deltas to populate this list.  Includes copy_N
      * @param base directory of the base, or the partition/table location if the files are in old
      *             style.  Can be null.
      * @param deltas directories of the delta files.

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 7c66955..5fb89d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -105,6 +105,7 @@ public class TestTxnCommands {
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
     hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf
     .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -880,4 +881,55 @@ public class TestTxnCommands {
     runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)");
     runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))");
   }
+  /**
+   * HIVE-16177
+   * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}
+   */
+  @Test
+  public void testNonAcidToAcidConversion01() throws Exception {
+    //create 1 row in a file 000001_0 (and an empty 000000_0)
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+    //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)");
+
+    //convert the table to Acid
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+    //create a delta directory
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)");
+
+    //make sure we assign correct Ids
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +  Table.NONACIDORCTBL + " order by ROW__ID");
+    LOG.warn("before compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("", 4, rs.size());
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+    //run Compaction
+    runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID");
+    LOG.warn("after compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("", 4, rs.size());
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+
+    //make sure they are the same before and after compaction
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5786c4f..31921f1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -319,6 +319,68 @@ public class TestTxnCommands2 {
     resultData = new int[][] {{3,8}, {5,6}, {9,20}};
     Assert.assertEquals(stringifyValues(resultData), rs);
   }
+  /**
+   * see HIVE-16177
+   * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
+   */
+  @Test
+  public void testNonAcidToAcidConversion02() throws Exception {
+    //create 2 rows in a file 000001_0 (and an empty 000000_0)
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)");
+    //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)");
+    //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)");
+
+    //convert the table to Acid
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+    List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
+    //create a some of delta directories
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)");
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12");
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)");
+    runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3");
+
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +  Table.NONACIDORCTBL + " order by a,b");
+    LOG.warn("before compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    /*
+     * All ROW__IDs are unique on read after conversion to acid 
+     * ROW__IDs are exactly the same before and after compaction
+     * Also check the file name after compaction for completeness
+     */
+    String[][] expected = {
+      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13",  "bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"},
+      {"{\"transactionid\":20,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6",   "bucket_00001"},
+      {"{\"transactionid\":18,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"}
+    };
+    Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+    }
+    //run Compaction
+    runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
+    LOG.warn("after compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //make sure they are the same before and after compaction
+  }
 
   /**
    * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index a7ff9a3..c928732 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
@@ -142,6 +143,10 @@ public class TestAcidUtils {
     Configuration conf = new Configuration();
     MockFileSystem fs = new MockFileSystem(conf,
         new MockFile("mock:/tbl/part1/000000_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1",
+          500, new byte[0]),
+        new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2",
+          500, new byte[0]),
         new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/000002_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/random", 500, new byte[0]),
@@ -154,13 +159,17 @@ public class TestAcidUtils {
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
     List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
-    assertEquals(5, result.size());
+    assertEquals(7, result.size());
     assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
-    assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
-    assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
-    assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1",
+      result.get(1).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2",
+      result.get(2).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000001_1", result.get(3).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000002_0", result.get(4).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/random", result.get(5).getFileStatus().getPath().toString());
     assertEquals("mock:/tbl/part1/subdir/000000_0",
-        result.get(4).getFileStatus().getPath().toString());
+        result.get(6).getFileStatus().getPath().toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index bb79857..43ed238 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -910,7 +910,7 @@ public class TestInputOutputFormat {
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
     return OrcInputFormat.determineSplitStrategies(combineCtx, context,
-        adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
         null, null, true);
   }
 
@@ -924,7 +924,7 @@ public class TestInputOutputFormat {
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategies(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
         null, null, true);
   }
 
@@ -3397,7 +3397,9 @@ public class TestInputOutputFormat {
     // call-2: open to read data - split 1 => mock:/mocktable5/0_0
     // call-3: open to read footer - split 2 => mock:/mocktable5/0_1
     // call-4: open to read data - split 2 => mock:/mocktable5/0_1
-    assertEquals(4, readOpsDelta);
+    // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0
+    // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1
+    assertEquals(6, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3470,7 +3472,9 @@ public class TestInputOutputFormat {
     }
     // call-1: open to read data - split 1 => mock:/mocktable6/0_0
     // call-2: open to read data - split 2 => mock:/mocktable6/0_1
-    assertEquals(2, readOpsDelta);
+    // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0
+    // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1
+    assertEquals(4, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3548,7 +3552,8 @@ public class TestInputOutputFormat {
     // call-2: open to read data - split 1 => mock:/mocktable7/0_0
     // call-3: open side file (flush length) of delta directory
     // call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    assertEquals(4, readOpsDelta);
+    // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0
+    assertEquals(5, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3625,7 +3630,8 @@ public class TestInputOutputFormat {
     // call-1: open to read data - split 1 => mock:/mocktable8/0_0
     // call-2: open side file (flush length) of delta directory
     // call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    assertEquals(3, readOpsDelta);
+    // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0
+    assertEquals(4, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 1ce1bfb..584bd3b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -189,6 +189,7 @@ public class TestOrcRawRecordMerger {
     RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
     ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
         new Reader.Options(), 0);
+    pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketId());
@@ -215,6 +216,7 @@ public class TestOrcRawRecordMerger {
 
     ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
         new Reader.Options(), 0);
+    pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketId());
@@ -290,8 +292,14 @@ public class TestOrcRawRecordMerger {
     RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
     RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
     boolean[] includes = new boolean[]{true, true};
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testOriginalReaderPair");
+    fs.makeQualified(root);
+    fs.create(root);
     ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
-        new Reader.Options().include(includes));
+        new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+    pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketId());
@@ -319,8 +327,14 @@ public class TestOrcRawRecordMerger {
   public void testOriginalReaderPairNoMin() throws Exception {
     ReaderKey key = new ReaderKey();
     Reader reader = createMockOriginalReader();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
+    fs.makeQualified(root);
+    fs.create(root);
     ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
-        new Reader.Options());
+        new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+    pair.advnaceToMinKey();
     assertEquals("first", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
     assertEquals(10, key.getBucketId());
@@ -423,7 +437,7 @@ public class TestOrcRawRecordMerger {
 
     OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
         false, 10, createMaximalTxnList(),
-        new Reader.Options().range(1000, 1000), null);
+        new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options());
     RecordReader rr = merger.getCurrentReader().recordReader;
     assertEquals(0, merger.getOtherReaders().size());
 
@@ -531,7 +545,7 @@ public class TestOrcRawRecordMerger {
     OrcRawRecordMerger merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
+            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
     RecordIdentifier key = merger.createKey();
     OrcStruct value = merger.createValue();
     assertEquals(false, merger.next(key, value));
@@ -606,7 +620,7 @@ public class TestOrcRawRecordMerger {
     OrcRawRecordMerger merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
+            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
     assertEquals(null, merger.getMinKey());
     assertEquals(null, merger.getMaxKey());
     RecordIdentifier id = merger.createKey();
@@ -681,7 +695,7 @@ public class TestOrcRawRecordMerger {
     // make a merger that doesn't collapse events
     merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
+            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
@@ -780,7 +794,7 @@ public class TestOrcRawRecordMerger {
     merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             txns, new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
+            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
     for(int i=0; i < values.length; ++i) {
       assertEquals(true, merger.next(id, event));
       LOG.info("id = " + id + "event = " + event);

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 6bf1312..73bc1ab 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -182,7 +182,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
     OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
     OrcInputFormat.AcidDirInfo adi = gen.call();
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
         null, null, true);
     assertEquals(1, splitStrategies.size());
     List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_orig_table.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_orig_table.q b/ql/src/test/queries/clientpositive/insert_orig_table.q
index de408d9..544fe11 100644
--- a/ql/src/test/queries/clientpositive/insert_orig_table.q
+++ b/ql/src/test/queries/clientpositive/insert_orig_table.q
@@ -1,9 +1,23 @@
-set hive.strict.checks.bucketing=false;
-
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 
+drop table if exists acid_iot_stage;
+create table acid_iot_stage(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) stored as orc;
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage;
+
 create table acid_iot(
     ctinyint TINYINT,
     csmallint SMALLINT,
@@ -18,7 +32,7 @@ create table acid_iot(
     cboolean1 BOOLEAN,
     cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
 
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot;
+insert into acid_iot select * from acid_iot_stage;
 
 select count(*) from acid_iot;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_values_orig_table.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table.q b/ql/src/test/queries/clientpositive/insert_values_orig_table.q
index db4fc82..1319a0a 100644
--- a/ql/src/test/queries/clientpositive/insert_values_orig_table.q
+++ b/ql/src/test/queries/clientpositive/insert_values_orig_table.q
@@ -1,9 +1,23 @@
-set hive.strict.checks.bucketing=false;
-
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 
+drop table if exists acid_ivot_stage;
+create table acid_ivot_stage(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) stored as orc;
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage;
+
 create table acid_ivot(
     ctinyint TINYINT,
     csmallint SMALLINT,
@@ -18,7 +32,7 @@ create table acid_ivot(
     cboolean1 BOOLEAN,
     cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
 
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
 
 select count(*) from acid_ivot;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
index 49c5b0a..2f366fb 100644
--- a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
+++ b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
@@ -1,10 +1,25 @@
-set hive.strict.checks.bucketing=false;
-
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 set hive.compute.query.using.stats=true;
 
+drop table if exists acid_ivot_stage;
+create table acid_ivot_stage(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) stored as orc;
+
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage;
+
 create table acid_ivot(
     ctinyint TINYINT,
     csmallint SMALLINT,
@@ -21,7 +36,7 @@ create table acid_ivot(
 
 desc formatted acid_ivot;
 
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
 
 desc formatted acid_ivot;
 
@@ -71,7 +86,7 @@ explain select count(*) from acid_ivot;
 
 select count(*) from acid_ivot;
 
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
 
 desc formatted acid_ivot;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/results/clientpositive/insert_orig_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/insert_orig_table.q.out b/ql/src/test/results/clientpositive/insert_orig_table.q.out
index 5eea74d..a62e0ec 100644
--- a/ql/src/test/results/clientpositive/insert_orig_table.q.out
+++ b/ql/src/test/results/clientpositive/insert_orig_table.q.out
@@ -1,3 +1,47 @@
+PREHOOK: query: drop table if exists acid_iot_stage
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists acid_iot_stage
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table acid_iot_stage(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_iot_stage
+POSTHOOK: query: create table acid_iot_stage(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_iot_stage
+PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@acid_iot_stage
+POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@acid_iot_stage
 PREHOOK: query: create table acid_iot(
     ctinyint TINYINT,
     csmallint SMALLINT,
@@ -30,14 +74,26 @@ POSTHOOK: query: create table acid_iot(
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@acid_iot
-PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot
-PREHOOK: type: LOAD
-#### A masked pattern was here ####
+PREHOOK: query: insert into acid_iot select * from acid_iot_stage
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_iot_stage
 PREHOOK: Output: default@acid_iot
-POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot
-POSTHOOK: type: LOAD
-#### A masked pattern was here ####
+POSTHOOK: query: insert into acid_iot select * from acid_iot_stage
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_iot_stage
 POSTHOOK: Output: default@acid_iot
+POSTHOOK: Lineage: acid_iot.cbigint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cboolean1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cboolean2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cdouble SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cfloat SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: acid_iot.csmallint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cstring1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cstring2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctimestamp1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctimestamp2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctinyint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
 PREHOOK: query: select count(*) from acid_iot
 PREHOOK: type: QUERY
 PREHOOK: Input: default@acid_iot