You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2015/07/20 22:12:18 UTC

[08/50] [abbrv] hive git commit: HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)

HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66feedc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66feedc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66feedc5

Branch: refs/heads/spark
Commit: 66feedc5569de959a383e0a58d9e8768bbad0e2c
Parents: 5c94bda
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 13 09:11:28 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 13 09:11:28 2015 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         |   4 +-
 .../streaming/mutate/worker/MutatorImpl.java    |   4 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   1 +
 .../hadoop/hive/ql/io/AcidInputFormat.java      |  60 +++++++-
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  49 +++++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++----
 .../hadoop/hive/ql/io/HiveFileFormatUtils.java  |  19 +--
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  20 +--
 .../hadoop/hive/ql/io/orc/OrcNewSplit.java      |  13 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  66 ++++++--
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  58 +++++++
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |  16 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  20 ++-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   4 +
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   3 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   3 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  27 +++-
 .../hive/ql/txn/compactor/CompactorMR.java      |   4 +-
 .../hive/ql/exec/TestFileSinkOperator.java      |   3 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  73 ++++++++-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  13 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  57 ++++---
 .../hive/ql/io/orc/TestOrcRecordUpdater.java    |   6 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |  20 ++-
 .../hive/ql/txn/compactor/TestCleaner.java      |   8 +-
 .../hive/ql/txn/compactor/TestCleaner2.java     |  14 ++
 .../hive/ql/txn/compactor/TestInitiator.java    |   4 +
 .../hive/ql/txn/compactor/TestWorker.java       |  49 +++---
 .../hive/ql/txn/compactor/TestWorker2.java      |  16 ++
 29 files changed, 645 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index ed46bca..c959222 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter {
                       .inspector(getSerde().getObjectInspector())
                       .bucket(bucketId)
                       .minimumTransactionId(minTxnId)
-                      .maximumTransactionId(maxTxnID));
+                      .maximumTransactionId(maxTxnID)
+                      .statementId(-1)
+                      .finalDestination(partitionPath));
     } catch (SerDeException e) {
       throw new SerializationError("Failed to get object inspector from Serde "
               + getSerde().getClass().getName(), e);

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
index 0fe41d5..52062f8 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -78,7 +78,9 @@ public class MutatorImpl implements Mutator {
             .bucket(bucketId)
             .minimumTransactionId(transactionId)
             .maximumTransactionId(transactionId)
-            .recordIdColumn(recordIdColumn));
+            .recordIdColumn(recordIdColumn)
+            .finalDestination(partitionPath)
+            .statementId(-1));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 934cb42..b74e5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -988,6 +988,7 @@ public class Driver implements CommandProcessor {
         if (acidSinks != null) {
           for (FileSinkDesc desc : acidSinks) {
             desc.setTransactionId(txnId);
+            desc.setStatementId(txnMgr.getStatementId());
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index e1d2395..24506b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * The interface required for input formats that what to support ACID
@@ -62,7 +68,7 @@ import java.io.IOException;
  *   <li>New format -
  *     <pre>
  *        $partition/base_$tid/$bucket
- *                   delta_$tid_$tid/$bucket
+ *                   delta_$tid_$tid_$stid/$bucket
  *     </pre></li>
  * </ul>
  * <p>
@@ -71,6 +77,8 @@ import java.io.IOException;
  * stored sorted by the original transaction id (ascending), bucket (ascending),
  * row id (ascending), and current transaction id (descending). Thus the files
  * can be merged by advancing through the files in parallel.
+ * The stid is unique id (within the transaction) of the statement that created
+ * this delta file.
  * <p>
  * The base files include all transactions from the beginning of time
  * (transaction id 0) to the transaction in the directory name. Delta
@@ -91,7 +99,7 @@ import java.io.IOException;
  *   For row-at-a-time processing, KEY can conveniently pass RowId into the operator
  *   pipeline.  For vectorized execution the KEY could perhaps represent a range in the batch.
  *   Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return
- *   {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined
+ *   {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined
  *   to provide access to the RowId.  Other implementations of AcidInputFormat can use either
  *   mechanism.
  * </p>
@@ -101,6 +109,54 @@ import java.io.IOException;
 public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     extends InputFormat<KEY, VALUE>, InputFormatChecker {
 
+  static final class DeltaMetaData implements Writable {
+    private long minTxnId;
+    private long maxTxnId;
+    private List<Integer> stmtIds;
+    
+    public DeltaMetaData() {
+      this(0,0,null);
+    }
+    DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
+      this.minTxnId = minTxnId;
+      this.maxTxnId = maxTxnId;
+      this.stmtIds = stmtIds;
+    }
+    long getMinTxnId() {
+      return minTxnId;
+    }
+    long getMaxTxnId() {
+      return maxTxnId;
+    }
+    List<Integer> getStmtIds() {
+      return stmtIds;
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(minTxnId);
+      out.writeLong(maxTxnId);
+      out.writeInt(stmtIds.size());
+      if(stmtIds == null) {
+        return;
+      }
+      for(Integer id : stmtIds) {
+        out.writeInt(id);
+      }
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      minTxnId = in.readLong();
+      maxTxnId = in.readLong();
+      int numStatements = in.readInt();
+      if(numStatements <= 0) {
+        return;
+      }
+      stmtIds = new ArrayList<>();
+      for(int i = 0; i < numStatements; i++) {
+        stmtIds.add(in.readInt());
+      }
+    }
+  }
   /**
    * Options for controlling the record readers.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 0d537e1..dd90a95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
   /**
    * Options to control how the files are written
    */
-  public static class Options {
+  public static class Options implements Cloneable {
     private final Configuration configuration;
     private FileSystem fs;
     private ObjectInspector inspector;
@@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private PrintStream dummyStream = null;
     private boolean oldStyle = false;
     private int recIdCol = -1;  // Column the record identifier is in, -1 indicates no record id
-
+    //unique within a transaction
+    private int statementId = 0;
+    private Path finalDestination;
     /**
      * Create the options object.
      * @param conf Use the given configuration
@@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     }
 
     /**
+     * shallow clone
+     */
+    @Override
+    public Options clone() {
+      try {
+        return (Options)super.clone();
+      }
+      catch(CloneNotSupportedException ex) {
+        throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex);
+      }
+    }
+    /**
      * Use the given ObjectInspector for each record written.
      * @param inspector the inspector to use.
      * @return this
@@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return this;
     }
 
+    /**
+     * @since 1.3.0
+     * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
+     * This is primarily needed for testing to make sure 1.3 code can still read files created
+     * by older code.  Also used by Comactor.
+     */
+    public Options statementId(int id) {
+      if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
+        throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
+      }
+      if(id < -1) {
+        throw new IllegalArgumentException("Illegal statementId value: " + id);
+      }
+      this.statementId = id;
+      return this;
+    }
+    /**
+     * @param p where the data for this operation will eventually end up;
+     *          basically table or partition directory in FS
+     */
+    public Options finalDestination(Path p) {
+      this.finalDestination = p;
+      return this;
+    }
+    
     public Configuration getConfiguration() {
       return configuration;
     }
@@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     boolean getOldStyle() {
       return oldStyle;
     }
+    public int getStatementId() {
+      return statementId;
+    }
+    public Path getFinalDestination() {
+      return finalDestination;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2214733..c7e0780 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -67,6 +67,15 @@ public class AcidUtils {
   };
   public static final String BUCKET_DIGITS = "%05d";
   public static final String DELTA_DIGITS = "%07d";
+  /**
+   * 10K statements per tx.  Probably overkill ... since that many delta files
+   * would not be good for performance
+   */
+  public static final String STATEMENT_DIGITS = "%04d";
+  /**
+   * This must be in sync with {@link #STATEMENT_DIGITS}
+   */
+  public static final int MAX_STATEMENTS_PER_TXN = 10000;
   public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
   public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
   public static final PathFilter originalBucketFilter = new PathFilter() {
@@ -79,7 +88,7 @@ public class AcidUtils {
   private AcidUtils() {
     // NOT USED
   }
-  private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
+  private static final Log LOG = LogFactory.getLog(AcidUtils.class);
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
@@ -104,12 +113,23 @@ public class AcidUtils {
         BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
   }
 
-  private static String deltaSubdir(long min, long max) {
+  /**
+   * This is format of delta dir name prior to Hive 1.3.x
+   */
+  public static String deltaSubdir(long min, long max) {
     return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
         String.format(DELTA_DIGITS, max);
   }
 
   /**
+   * Each write statement in a transaction creates its own delta dir.
+   * @since 1.3.x
+   */
+  public static String deltaSubdir(long min, long max, int statementId) {
+    return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
+  }
+
+  /**
    * Create a filename for a bucket file.
    * @param directory the partition directory
    * @param options the options for writing the bucket
@@ -124,9 +144,15 @@ public class AcidUtils {
     } else if (options.isWritingBase()) {
       subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
           options.getMaximumTransactionId());
+    } else if(options.getStatementId() == -1) {
+      //when minor compaction runs, we collapse per statement delta files inside a single
+      //transaction so we no longer need a statementId in the file name
+      subdir = deltaSubdir(options.getMinimumTransactionId(),
+        options.getMaximumTransactionId());
     } else {
       subdir = deltaSubdir(options.getMinimumTransactionId(),
-          options.getMaximumTransactionId());
+        options.getMaximumTransactionId(),
+        options.getStatementId());
     }
     return createBucketFile(new Path(directory, subdir), options.getBucket());
   }
@@ -214,14 +240,24 @@ public class AcidUtils {
   }
 
   public static class ParsedDelta implements Comparable<ParsedDelta> {
-    final long minTransaction;
-    final long maxTransaction;
-    final FileStatus path;
+    private final long minTransaction;
+    private final long maxTransaction;
+    private final FileStatus path;
+    //-1 is for internal (getAcidState()) purposes and means the delta dir
+    //had no statement ID
+    private final int statementId;
 
+    /**
+     * for pre 1.3.x delta files
+     */
     ParsedDelta(long min, long max, FileStatus path) {
+      this(min, max, path, -1);
+    }
+    ParsedDelta(long min, long max, FileStatus path, int statementId) {
       this.minTransaction = min;
       this.maxTransaction = max;
       this.path = path;
+      this.statementId = statementId;
     }
 
     public long getMinTransaction() {
@@ -236,6 +272,16 @@ public class AcidUtils {
       return path.getPath();
     }
 
+    public int getStatementId() {
+      return statementId == -1 ? 0 : statementId;
+    }
+
+    /**
+     * Compactions (Major/Minor) merge deltas/bases but delete of old files
+     * happens in a different process; thus it's possible to have bases/deltas with
+     * overlapping txnId boundaries.  The sort order helps figure out the "best" set of files
+     * to use to get data.
+     */
     @Override
     public int compareTo(ParsedDelta parsedDelta) {
       if (minTransaction != parsedDelta.minTransaction) {
@@ -250,7 +296,22 @@ public class AcidUtils {
         } else {
           return -1;
         }
-      } else {
+      }
+      else if(statementId != parsedDelta.statementId) {
+        /**
+         * We want deltas after minor compaction (w/o statementId) to sort
+         * earlier so that getAcidState() considers compacted files (into larger ones) obsolete
+         * Before compaction, include deltas with all statementIds for a given txnId
+         * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
+         */
+        if(statementId < parsedDelta.statementId) {
+          return -1;
+        }
+        else {
+          return 1;
+        }
+      }
+      else {
         return path.compareTo(parsedDelta.path);
       }
     }
@@ -271,46 +332,72 @@ public class AcidUtils {
 
   /**
    * Convert the list of deltas into an equivalent list of begin/end
-   * transaction id pairs.
+   * transaction id pairs.  Assumes {@code deltas} is sorted.
    * @param deltas
    * @return the list of transaction ids to serialize
    */
-  public static List<Long> serializeDeltas(List<ParsedDelta> deltas) {
-    List<Long> result = new ArrayList<Long>(deltas.size() * 2);
-    for(ParsedDelta delta: deltas) {
-      result.add(delta.minTransaction);
-      result.add(delta.maxTransaction);
+  public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
+    List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
+    AcidInputFormat.DeltaMetaData last = null;
+    for(ParsedDelta parsedDelta : deltas) {
+      if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
+        last.getStmtIds().add(parsedDelta.getStatementId());
+        continue;
+      }
+      last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
+      result.add(last);
+      if(parsedDelta.statementId >= 0) {
+        last.getStmtIds().add(parsedDelta.getStatementId());
+      }
     }
     return result;
   }
 
   /**
    * Convert the list of begin/end transaction id pairs to a list of delta
-   * directories.
+   * directories.  Note that there may be multiple delta files for the exact same txn range starting
+   * with 1.3.x;
+   * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
    * @param root the root directory
    * @param deltas list of begin/end transaction id pairs
    * @return the list of delta paths
    */
-  public static Path[] deserializeDeltas(Path root, List<Long> deltas) {
-    int deltaSize = deltas.size() / 2;
-    Path[] result = new Path[deltaSize];
-    for(int i = 0; i < deltaSize; ++i) {
-      result[i] = new Path(root, deltaSubdir(deltas.get(i * 2),
-          deltas.get(i * 2 + 1)));
+  public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
+    List<Path> results = new ArrayList<Path>(deltas.size());
+    for(AcidInputFormat.DeltaMetaData dmd : deltas) {
+      if(dmd.getStmtIds().isEmpty()) {
+        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+        continue;
+      }
+      for(Integer stmtId : dmd.getStmtIds()) {
+        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+      }
     }
-    return result;
+    return results.toArray(new Path[results.size()]);
   }
 
-  static ParsedDelta parseDelta(FileStatus path) {
-    String filename = path.getPath().getName();
+  private static ParsedDelta parseDelta(FileStatus path) {
+    ParsedDelta p = parsedDelta(path.getPath());
+    return new ParsedDelta(p.getMinTransaction(),
+      p.getMaxTransaction(), path, p.statementId);
+  }
+  public static ParsedDelta parsedDelta(Path deltaDir) {
+    String filename = deltaDir.getName();
     if (filename.startsWith(DELTA_PREFIX)) {
       String rest = filename.substring(DELTA_PREFIX.length());
       int split = rest.indexOf('_');
+      int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
       long min = Long.parseLong(rest.substring(0, split));
-      long max = Long.parseLong(rest.substring(split + 1));
-      return new ParsedDelta(min, max, path);
+      long max = split2 == -1 ?
+        Long.parseLong(rest.substring(split + 1)) :
+        Long.parseLong(rest.substring(split + 1, split2));
+      if(split2 == -1) {
+        return new ParsedDelta(min, max, null);
+      }
+      int statementId = Integer.parseInt(rest.substring(split2 + 1));
+      return new ParsedDelta(min, max, null, statementId);
     }
-    throw new IllegalArgumentException(path + " does not start with " +
+    throw new IllegalArgumentException(deltaDir + " does not start with " +
                                        DELTA_PREFIX);
   }
 
@@ -407,15 +494,24 @@ public class AcidUtils {
 
     Collections.sort(working);
     long current = bestBaseTxn;
+    int lastStmtId = -1;
     for(ParsedDelta next: working) {
       if (next.maxTransaction > current) {
         // are any of the new transactions ones that we care about?
         if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
-            ValidTxnList.RangeResponse.NONE) {
+          ValidTxnList.RangeResponse.NONE) {
           deltas.add(next);
           current = next.maxTransaction;
+          lastStmtId = next.statementId;
         }
-      } else {
+      }
+      else if(next.maxTransaction == current && lastStmtId >= 0) {
+        //make sure to get all deltas within a single transaction;  multi-statement txn
+        //generate multiple delta files with the same txnId range
+        //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
+        deltas.add(next);
+      }
+      else {
         obsolete.add(next.path);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 7ad5aa0..50ba740 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -297,31 +297,32 @@ public final class HiveFileFormatUtils {
     // TODO not 100% sure about this.  This call doesn't set the compression type in the conf
     // file the way getHiveRecordWriter does, as ORC appears to read the value for itself.  Not
     // sure if this is correct or not.
-    return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
-        bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+    return getRecordUpdater(jc, acidOutputFormat,
+        bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
   }
 
 
   private static RecordUpdater getRecordUpdater(JobConf jc,
                                                 AcidOutputFormat<?, ?> acidOutputFormat,
-                                                boolean isCompressed,
-                                                long txnId,
                                                 int bucket,
                                                 ObjectInspector inspector,
                                                 Properties tableProp,
                                                 Path outPath,
                                                 Reporter reporter,
-                                                int rowIdColNum) throws IOException {
+                                                int rowIdColNum,
+                                                FileSinkDesc conf) throws IOException {
     return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
-        .isCompressed(isCompressed)
+        .isCompressed(conf.getCompressed())
         .tableProperties(tableProp)
         .reporter(reporter)
         .writingBase(false)
-        .minimumTransactionId(txnId)
-        .maximumTransactionId(txnId)
+        .minimumTransactionId(conf.getTransactionId())
+        .maximumTransactionId(conf.getTransactionId())
         .bucket(bucket)
         .inspector(inspector)
-        .recordIdColumn(rowIdColNum));
+        .recordIdColumn(rowIdColNum)
+        .statementId(conf.getStatementId())
+        .finalDestination(conf.getDestPath()));
   }
 
   public static PartitionDesc getPartitionDescFromPathRecursively(

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8864013..3a9e64e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -439,13 +439,13 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private final FileStatus file;
     private final FileInfo fileInfo;
     private final boolean isOriginal;
-    private final List<Long> deltas;
+    private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
 
     SplitInfo(Context context, FileSystem fs,
         FileStatus file, FileInfo fileInfo,
         boolean isOriginal,
-        List<Long> deltas,
+        List<DeltaMetaData> deltas,
         boolean hasBase, Path dir, boolean[] covered) throws IOException {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
@@ -467,12 +467,12 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     FileSystem fs;
     List<FileStatus> files;
     boolean isOriginal;
-    List<Long> deltas;
+    List<DeltaMetaData> deltas;
     Path dir;
     boolean[] covered;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
-        boolean isOriginal, List<Long> deltas, boolean[] covered) {
+        boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
       this.context = context;
       this.dir = dir;
       this.fs = fs;
@@ -543,14 +543,14 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   static final class BISplitStrategy extends ACIDSplitStrategy {
     List<FileStatus> fileStatuses;
     boolean isOriginal;
-    List<Long> deltas;
+    List<DeltaMetaData> deltas;
     FileSystem fs;
     Context context;
     Path dir;
 
     public BISplitStrategy(Context context, FileSystem fs,
         Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
-        List<Long> deltas, boolean[] covered) {
+        List<DeltaMetaData> deltas, boolean[] covered) {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
       this.fileStatuses = fileStatuses;
@@ -587,11 +587,11 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
    */
   static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
     Path dir;
-    List<Long> deltas;
+    List<DeltaMetaData> deltas;
     boolean[] covered;
     int numBuckets;
 
-    public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) {
+    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
       this.dir = dir;
       this.numBuckets = numBuckets;
       this.deltas = deltas;
@@ -640,7 +640,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
       final SplitStrategy splitStrategy;
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
           context.conf, context.transactionList);
-      List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+      List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
       Path base = dirInfo.getBaseDirectory();
       List<FileStatus> original = dirInfo.getOriginalFiles();
       boolean[] covered = new boolean[context.numBuckets];
@@ -718,7 +718,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private Metadata metadata;
     private List<OrcProto.Type> types;
     private final boolean isOriginal;
-    private final List<Long> deltas;
+    private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
     private OrcFile.WriterVersion writerVersion;
     private long projColsUncompressedSize;

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index da23544..b58c880 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit {
   private boolean hasFooter;
   private boolean isOriginal;
   private boolean hasBase;
-  private final List<Long> deltas = new ArrayList<Long>();
+  private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
   private OrcFile.WriterVersion writerVersion;
 
   protected OrcNewSplit(){
@@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit {
         (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
     out.writeByte(flags);
     out.writeInt(deltas.size());
-    for(Long delta: deltas) {
-      out.writeLong(delta);
+    for(AcidInputFormat.DeltaMetaData delta: deltas) {
+      delta.write(out);
     }
     if (hasFooter) {
       // serialize FileMetaInfo fields
@@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit {
     deltas.clear();
     int numDeltas = in.readInt();
     for(int i=0; i < numDeltas; i++) {
-      deltas.add(in.readLong());
+      AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+      dmd.readFields(in);
+      deltas.add(dmd);
     }
     if (hasFooter) {
       // deserialize FileMetaInfo fields
@@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit {
     return hasBase;
   }
 
-  public List<Long> getDeltas() {
+  public List<AcidInputFormat.DeltaMetaData> getDeltas() {
     return deltas;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 728118a..2f11611 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   /**
    * A RecordIdentifier extended with the current transaction id. This is the
    * key of our merge sort with the originalTransaction, bucket, and rowId
-   * ascending and the currentTransaction descending. This means that if the
+   * ascending and the currentTransaction, statementId descending. This means that if the
    * reader is collapsing events to just the last update, just the first
    * instance of each record is required.
    */
   final static class ReaderKey extends RecordIdentifier{
     private long currentTransactionId;
+    private int statementId;//sort on this descending, like currentTransactionId
 
     public ReaderKey() {
-      this(-1, -1, -1, -1);
+      this(-1, -1, -1, -1, 0);
     }
 
     public ReaderKey(long originalTransaction, int bucket, long rowId,
                      long currentTransactionId) {
+      this(originalTransaction, bucket, rowId, currentTransactionId, 0);
+    }
+    /**
+     * @param statementId - set this to 0 if N/A
+     */
+    public ReaderKey(long originalTransaction, int bucket, long rowId,
+                     long currentTransactionId, int statementId) {
       super(originalTransaction, bucket, rowId);
       this.currentTransactionId = currentTransactionId;
+      this.statementId = statementId;
     }
 
     @Override
     public void set(RecordIdentifier other) {
       super.set(other);
       currentTransactionId = ((ReaderKey) other).currentTransactionId;
+      statementId = ((ReaderKey) other).statementId;
     }
 
     public void setValues(long originalTransactionId,
                           int bucket,
                           long rowId,
-                          long currentTransactionId) {
+                          long currentTransactionId,
+                          int statementId) {
       setValues(originalTransactionId, bucket, rowId);
       this.currentTransactionId = currentTransactionId;
+      this.statementId = statementId;
     }
 
     @Override
     public boolean equals(Object other) {
       return super.equals(other) &&
-          currentTransactionId == ((ReaderKey) other).currentTransactionId;
+          currentTransactionId == ((ReaderKey) other).currentTransactionId
+            && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
+          ;
     }
 
     @Override
@@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           if (currentTransactionId != oth.currentTransactionId) {
             return currentTransactionId < oth.currentTransactionId ? +1 : -1;
           }
+          if(statementId != oth.statementId) {
+            return statementId < oth.statementId ? +1 : -1;
+          }
         } else {
           return -1;
         }
@@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return sup;
     }
 
+    /**
+     * This means 1 txn modified the same row more than once
+     */
+    private boolean isSameRow(ReaderKey other) {
+      return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
+    }
+    
     public long getCurrentTransactionId() {
       return currentTransactionId;
     }
@@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     public String toString() {
       return "{originalTxn: " + getTransactionId() + ", bucket: " +
           getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
-          currentTransactionId + "}";
+          currentTransactionId + ", statementId: "+ statementId + "}";
     }
   }
 
@@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     final ReaderKey key;
     final RecordIdentifier maxKey;
     final int bucket;
+    private final int statementId;
 
     /**
      * Create a reader that reads from the first key larger than minKey to any
@@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * @param maxKey only return keys less than or equal to maxKey if it is
      *               non-null
      * @param options options to provide to read the rows.
+     * @param statementId id of SQL statement within a transaction
      * @throws IOException
      */
     ReaderPair(ReaderKey key, Reader reader, int bucket,
                RecordIdentifier minKey, RecordIdentifier maxKey,
-               ReaderImpl.Options options) throws IOException {
+               ReaderImpl.Options options, int statementId) throws IOException {
       this.reader = reader;
       this.key = key;
       this.maxKey = maxKey;
       this.bucket = bucket;
       // TODO use stripe statistics to jump over stripes
       recordReader = reader.rowsOptions(options);
+      this.statementId = statementId;
       // advance the reader until we reach the minimum key
       do {
         next(nextRecord);
@@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
             OrcRecordUpdater.getBucket(nextRecord),
             OrcRecordUpdater.getRowId(nextRecord),
-            OrcRecordUpdater.getCurrentTransaction(nextRecord));
+            OrcRecordUpdater.getCurrentTransaction(nextRecord),
+            statementId);
 
         // if this record is larger than maxKey, we need to stop
         if (maxKey != null && key.compareRow(maxKey) > 0) {
@@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
                        RecordIdentifier minKey, RecordIdentifier maxKey,
                        Reader.Options options) throws IOException {
-      super(key, reader, bucket, minKey, maxKey, options);
+      super(key, reader, bucket, minKey, maxKey, options, 0);
     }
 
     @Override
@@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord.setFieldValue(OrcRecordUpdater.ROW,
               recordReader.next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucket, nextRowId, 0L);
+        key.setValues(0L, bucket, nextRowId, 0L, 0);
         if (maxKey != null && key.compareRow(maxKey) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + maxKey);
@@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.offset = options.getOffset();
     this.length = options.getLength();
     this.validTxnList = validTxnList;
-    // modify the optins to reflect the event instead of the base row
+    // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
     if (reader == null) {
       baseReader = null;
@@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                                       options);
       } else {
         pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
-                              eventOptions);
+                              eventOptions, 0);
       }
 
       // if there is at least one record, put it in the map
@@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       for(Path delta: deltaDirectory) {
         ReaderKey key = new ReaderKey();
         Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
+        AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
         FileSystem fs = deltaFile.getFileSystem(conf);
         long length = getLastFlushLength(fs, deltaFile);
         if (length != -1 && fs.exists(deltaFile)) {
           Reader deltaReader = OrcFile.createReader(deltaFile,
               OrcFile.readerOptions(conf).maxLength(length));
           ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
-            maxKey, eventOptions);
+            maxKey, eventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord != null) {
             readers.put(key, deltaPair);
           }
@@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         continue;
       }
 
+      /*for multi-statement txns, you may have multiple events for the same
+      * row in the same (current) transaction.  We want to collapse these to just the last one
+      * regardless whether we are minor compacting.  Consider INSERT/UPDATE/UPDATE of the
+      * same row in the same txn.  There is no benefit passing along anything except the last
+      * event.  If we did want to pass it along, we'd have to include statementId in the row
+      * returned so that compaction could write it out or make minor minor compaction understand
+      * how to write out delta files in delta_xxx_yyy_stid format.  There doesn't seem to be any
+      * value in this.*/
+      boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
       // if we are collapsing, figure out if this is a new row
-      if (collapse) {
-        keysSame = prevKey.compareRow(recordIdentifier) == 0;
+      if (collapse || isSameRow) {
+        keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
         if (!keysSame) {
           prevKey.set(recordIdentifier);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index b576496..e4651b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   private final IntWritable bucket = new IntWritable();
   private final LongWritable rowId = new LongWritable();
   private long insertedRows = 0;
+  private long rowIdOffset = 0;
   // This records how many rows have been inserted or deleted.  It is separate from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
@@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater {
     item.setFieldValue(ROW_ID, rowId);
   }
 
+  /**
+   * To handle multiple INSERT... statements in a single transaction, we want to make sure
+   * to generate unique {@code rowId} for all inserted rows of the transaction.
+   * @return largest rowId created by previous statements (maybe 0)
+   * @throws IOException
+   */
+  private long findRowIdOffsetForInsert() throws IOException {
+    /*
+    * 1. need to know bucket we are writing to
+    * 2. need to know which delta dir it's in
+    * Then,
+    * 1. find the same bucket file in previous delta dir for this txn
+    * 2. read the footer and get AcidStats which has insert count
+     * 2.1 if AcidStats.inserts>0 done
+     *  else go to previous delta file
+     *  For example, consider insert/update/insert case...*/
+    if(options.getStatementId() <= 0) {
+      return 0;//there is only 1 statement in this transaction (so far)
+    }
+    for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
+      Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
+      if(!fs.exists(matchingBucket)) {
+        continue;
+      }
+      Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
+      //no close() on Reader?!
+      AcidStats acidStats = parseAcidStats(reader);
+      if(acidStats.inserts > 0) {
+        return acidStats.inserts;
+      }
+    }
+    //if we got here, we looked at all delta files in this txn, prior to current statement and didn't 
+    //find any inserts...
+    return 0;
+  }
   // Find the record identifier column (if there) and return a possibly new ObjectInspector that
   // will strain out the record id for the underlying writer.
   private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
@@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
           recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
       rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
     }
+    else if(operation == INSERT_OPERATION) {
+      rowId += rowIdOffset;
+    }
     this.rowId.set(rowId);
     this.originalTransaction.set(originalTransaction);
     item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
@@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void insert(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
+      //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
+      //always true in that case
+      rowIdOffset = findRowIdOffsetForInsert();
     }
     addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
     rowCountDelta++;
@@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
     return result;
   }
+  /**
+   * {@link KeyIndexBuilder} creates these
+   */
+  static AcidStats parseAcidStats(Reader reader) {
+    String statsSerialized;
+    try {
+      ByteBuffer val =
+        reader.getMetadataValue(OrcRecordUpdater.ACID_STATS)
+          .duplicate();
+      statsSerialized = utf8Decoder.decode(val).toString();
+    } catch (CharacterCodingException e) {
+      throw new IllegalArgumentException("Bad string encoding for " +
+        OrcRecordUpdater.ACID_STATS, e);
+    }
+    return new AcidStats(statsSerialized);
+  }
 
   static class KeyIndexBuilder implements OrcFile.WriterCallback {
     StringBuilder lastKey = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 0c7dd40..8cf4cc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;
@@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit {
   private boolean hasFooter;
   private boolean isOriginal;
   private boolean hasBase;
-  private final List<Long> deltas = new ArrayList<Long>();
+  private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
   private OrcFile.WriterVersion writerVersion;
   private long projColsUncompressedSize;
 
@@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit {
 
   public OrcSplit(Path path, long offset, long length, String[] hosts,
       ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
-      List<Long> deltas, long projectedDataSize) {
+      List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
     super(path, offset, length, hosts);
     this.fileMetaInfo = fileMetaInfo;
     hasFooter = this.fileMetaInfo != null;
@@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit {
         (hasFooter ? FOOTER_FLAG : 0);
     out.writeByte(flags);
     out.writeInt(deltas.size());
-    for(Long delta: deltas) {
-      out.writeLong(delta);
+    for(AcidInputFormat.DeltaMetaData delta: deltas) {
+      delta.write(out);
     }
     if (hasFooter) {
       // serialize FileMetaInfo fields
@@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit {
     deltas.clear();
     int numDeltas = in.readInt();
     for(int i=0; i < numDeltas; i++) {
-      deltas.add(in.readLong());
+      AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+      dmd.readFields(in);
+      deltas.add(dmd);
     }
     if (hasFooter) {
       // deserialize FileMetaInfo fields
@@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit {
     return hasBase;
   }
 
-  public List<Long> getDeltas() {
+  public List<AcidInputFormat.DeltaMetaData> getDeltas() {
     return deltas;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index f8fff1a..445f606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   private DbLockManager lockMgr = null;
   private IMetaStoreClient client = null;
   private long txnId = 0;
+  /**
+   * assigns a unique monotonically increasing ID to each statement
+   * which is part of an open transaction.  This is used by storage
+   * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
+   * to keep apart multiple writes of the same data within the same transaction
+   * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
+   */
+  private int statementId = -1;
 
   DbTxnManager() {
   }
@@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     init();
     try {
       txnId = client.openTxn(user);
+      statementId = 0;
       LOG.debug("Opened txn " + txnId);
       return txnId;
     } catch (TException e) {
@@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       return null;
     }
 
-    List<HiveLock> locks = new ArrayList<HiveLock>(1); 
+    List<HiveLock> locks = new ArrayList<HiveLock>(1);
+    if(txnId > 0) {
+      statementId++;
+    }
     LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
     ctx.setHiveLocks(locks);
     return lockState;
@@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
+      statementId = -1;
     }
   }
 
@@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
+      statementId = -1;
     }
   }
 
@@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       }
     }
   }
+  @Override
+  public int getStatementId() {
+    return statementId;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 21ab8ee..1906982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public int getStatementId() {
+    return 0;
+  }
+  @Override
   public HiveLockManager getLockManager() throws LockException {
     if (lockMgr == null) {
       boolean supportConcurrency =

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 2dd0c7d..6c3dc33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -127,4 +127,7 @@ public interface HiveTxnManager {
    * @return true if this transaction manager does ACID
    */
   boolean supportsAcid();
+
+  int getStatementId();
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b02374e..8516631 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       rsCtx.getNumFiles(),
       rsCtx.getTotalFiles(),
       rsCtx.getPartnCols(),
-      dpCtx);
+      dpCtx,
+      dest_path);
 
     // If this is an insert, update, or delete on an ACID table then mark that so the
     // FileSinkOperator knows how to properly write to it.

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index bb6cee5..f73b502 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   // Record what type of write this is.  Default is non-ACID (ie old style).
   private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
   private long txnId = 0;  // transaction id for this operation
+  private int statementId = -1;
 
   private transient Table table;
+  private Path destPath;
 
   public FileSinkDesc() {
   }
 
+  /**
+   * @param destPath - the final destination for data
+   */
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
       final boolean canBeMerged, final int numFiles, final int totalFiles,
-      final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) {
+      final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.partitionCols = partitionCols;
     this.dpCtx = dpCtx;
     this.dpSortState = DPSortState.NONE;
+    this.destPath = destPath;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public Object clone() throws CloneNotSupportedException {
     FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
         destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx);
+        partitionCols, dpCtx, destPath);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     return temporary;
   }
 
-  /**
-   * @param totalFiles the totalFiles to set
-   */
   public void setTemporary(boolean temporary) {
     this.temporary = temporary;
   }
@@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public void setTransactionId(long id) {
     txnId = id;
   }
-
   public long getTransactionId() {
     return txnId;
   }
 
+  public void setStatementId(int id) {
+    statementId = id;
+  }
+  /**
+   * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)}
+   */
+  public int getStatementId() {
+    return statementId;
+  }
+  public Path getDestPath() {
+    return destPath;
+  }
+
   public Table getTable() {
     return table;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c5f2d4d..6c77ba4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -545,7 +545,9 @@ public class CompactorMR {
             .reporter(reporter)
             .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
             .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
-            .bucket(bucket);
+            .bucket(bucket)
+            .statementId(-1);//setting statementId == -1 makes compacted delta files use
+        //delta_xxxx_yyyy format
 
         // Instantiate the underlying output format
         @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index e400778..c6ae030 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -303,7 +303,8 @@ public class TestFileSinkOperator {
       Map<String, String> partColNames = new HashMap<String, String>(1);
       partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
       dpCtx.setInputToDPCols(partColNames);
-      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
+      //todo: does this need the finalDestination?
+      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 1e3df34..f8ded12 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -46,17 +46,23 @@ public class TestAcidUtils {
         AcidUtils.createFilename(p, options).toString());
     options.bucket(123);
     assertEquals("/tmp/00123_0",
-        AcidUtils.createFilename(p, options).toString());
+      AcidUtils.createFilename(p, options).toString());
     options.bucket(23)
         .minimumTransactionId(100)
         .maximumTransactionId(200)
         .writingBase(true)
         .setOldStyle(false);
     assertEquals("/tmp/base_0000200/bucket_00023",
-        AcidUtils.createFilename(p, options).toString());
+      AcidUtils.createFilename(p, options).toString());
     options.writingBase(false);
+    assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
+    options.statementId(-1);
     assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
-        AcidUtils.createFilename(p, options).toString());
+      AcidUtils.createFilename(p, options).toString());
+    options.statementId(7);
+    assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
   }
 
   @Test
@@ -236,7 +242,6 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
-        new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
@@ -254,6 +259,45 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
   }
 
+  /**
+   * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns
+   * @throws Exception
+   */
+  @Test
+  public void testOverlapingDelta2() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+      AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+    assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(5, obsolete.size());
+    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(5, delts.size());
+    assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString());
+  }
+
   @Test
   public void deltasWithOpenTxnInRead() throws Exception {
     Configuration conf = new Configuration();
@@ -268,6 +312,27 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
   }
 
+  /**
+   * @since 1.3.0
+   * @throws Exception
+   */
+  @Test
+  public void deltasWithOpenTxnInRead2() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(2, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
+  }
+
   @Test
   public void deltasWithOpenTxnsNotInCompact() throws Exception {
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 56e5f9f..e96ab2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -927,7 +928,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
             fs.getFileStatus(new Path("/a/file")), null, true,
-            new ArrayList<Long>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
     OrcSplit result = splitter.createSplit(0, 200, null);
     assertEquals(0, result.getStart());
     assertEquals(200, result.getLength());
@@ -968,7 +969,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
             fs.getFileStatus(new Path("/a/file")), null, true,
-            new ArrayList<Long>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
     List<OrcSplit> results = splitter.call();
     OrcSplit result = results.get(0);
     assertEquals(3, result.getStart());
@@ -990,7 +991,7 @@ public class TestInputOutputFormat {
     conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
     context = new OrcInputFormat.Context(conf);
     splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
-      fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(),
+      fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
         true, null, null));
     results = splitter.call();
     for(int i=0; i < stripeSizes.length; ++i) {
@@ -1497,7 +1498,7 @@ public class TestInputOutputFormat {
     Path partDir = new Path(conf.get("mapred.input.dir"));
     OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
         new AcidOutputFormat.Options(conf).maximumTransactionId(10)
-            .writingBase(true).bucket(0).inspector(inspector));
+            .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir));
     for(int i=0; i < 100; ++i) {
       BigRow row = new BigRow(i);
       writer.insert(10, row);
@@ -1648,7 +1649,7 @@ public class TestInputOutputFormat {
     // write a base file in partition 0
     OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
         new AcidOutputFormat.Options(conf).maximumTransactionId(10)
-            .writingBase(true).bucket(0).inspector(inspector));
+            .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]));
     for(int i=0; i < 10; ++i) {
       writer.insert(10, new MyRow(i, 2 * i));
     }
@@ -1661,7 +1662,7 @@ public class TestInputOutputFormat {
     // write a delta file in partition 0
     writer = new OrcRecordUpdater(partDir[0],
         new AcidOutputFormat.Options(conf).maximumTransactionId(10)
-            .writingBase(true).bucket(1).inspector(inspector));
+            .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]));
     for(int i=10; i < 20; ++i) {
       writer.insert(10, new MyRow(i, 2*i));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 921e954..39f71f1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull;
 public class TestOrcRawRecordMerger {
 
   private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
-
+//todo: why is statementId -1?
   @Test
   public void testOrdering() throws Exception {
     ReaderKey left = new ReaderKey(100, 200, 1200, 300);
     ReaderKey right = new ReaderKey();
-    right.setValues(100, 200, 1000, 200);
+    right.setValues(100, 200, 1000, 200,1);
     assertTrue(right.compareTo(left) < 0);
     assertTrue(left.compareTo(right) > 0);
     assertEquals(false, left.equals(right));
@@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger {
     assertEquals(true, right.equals(left));
     right.setRowId(2000);
     assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 4);
-    right.setValues(100, 2, 3, 4);
+    left.setValues(1, 2, 3, 4,-1);
+    right.setValues(100, 2, 3, 4,-1);
     assertTrue(left.compareTo(right) < 0);
     assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 4);
-    right.setValues(1, 100, 3, 4);
+    left.setValues(1, 2, 3, 4,-1);
+    right.setValues(1, 100, 3, 4,-1);
     assertTrue(left.compareTo(right) < 0);
     assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 100);
-    right.setValues(1, 2, 3, 4);
+    left.setValues(1, 2, 3, 100,-1);
+    right.setValues(1, 2, 3, 4,-1);
     assertTrue(left.compareTo(right) < 0);
     assertTrue(right.compareTo(left) > 0);
 
@@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger {
     RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
     RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
     ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
-        new Reader.Options());
+        new Reader.Options(), 0);
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketId());
@@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger {
     Reader reader = createMockReader();
 
     ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
-        new Reader.Options());
+        new Reader.Options(), 0);
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
     assertEquals(20, key.getBucketId());
@@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger {
     // write the empty base
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .inspector(inspector).bucket(BUCKET).writingBase(true)
-        .maximumTransactionId(100);
+        .maximumTransactionId(100).finalDestination(root);
     of.getRecordUpdater(root, options).close(false);
 
     ValidTxnList txnList = new ValidReadTxnList("200:");
@@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger {
    */
   @Test
   public void testNewBaseAndDelta() throws Exception {
+    testNewBaseAndDelta(false);
+    testNewBaseAndDelta(true);
+  }
+  private void testNewBaseAndDelta(boolean use130Format) throws Exception {
     final int BUCKET = 10;
     String[] values = new String[]{"first", "second", "third", "fourth",
                                    "fifth", "sixth", "seventh", "eighth",
@@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger {
 
     // write the base
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-        .inspector(inspector).bucket(BUCKET);
+        .inspector(inspector).bucket(BUCKET).finalDestination(root);
+    if(!use130Format) {
+      options.statementId(-1);
+    }
     RecordUpdater ru = of.getRecordUpdater(root,
         options.writingBase(true).maximumTransactionId(100));
     for(String v: values) {
@@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger {
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
-    assertEquals(new Path(root, "delta_0000200_0000200"),
+    assertEquals(new Path(root, use130Format ?
+        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
         directory.getCurrentDirectories().get(0).getPath());
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger {
     // write a delta
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
-        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
+        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
     values = new String[]{"0.0", null, null, "1.1", null, null, null,
         "ignore.7"};
@@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger {
     options.orcOptions(OrcFile.writerOptions(conf)
       .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
       .memory(mgr));
+    options.finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
     String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
         "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
@@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger {
     AcidOutputFormat.Options options =
         new AcidOutputFormat.Options(conf)
             .bucket(BUCKET).inspector(inspector).filesystem(fs)
-            .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+            .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+          .finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
     String[] values = new String[]{"a", "b", "c", "d", "e"};
     for(int i=0; i < values.length; ++i) {
@@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger {
    */
   @Test
   public void testRecordReaderIncompleteDelta() throws Exception {
+    testRecordReaderIncompleteDelta(false);
+    testRecordReaderIncompleteDelta(true);
+  }
+  /**
+   * 
+   * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
+   */
+  private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
     final int BUCKET = 1;
     Configuration conf = new Configuration();
     OrcOutputFormat of = new OrcOutputFormat();
@@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger {
     AcidOutputFormat.Options options =
         new AcidOutputFormat.Options(conf)
             .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
-            .bucket(BUCKET).inspector(inspector).filesystem(fs);
+            .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
+    if(!use130Format) {
+      options.statementId(-1);
+    }
     RecordUpdater ru = of.getRecordUpdater(root, options);
     String[] values= new String[]{"1", "2", "3", "4", "5"};
     for(int i=0; i < values.length; ++i) {
@@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger {
     splits = inf.getSplits(job, 1);
     assertEquals(2, splits.length);
     rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
-    Path sideFile = new Path(root +
-        "/delta_0000010_0000019/bucket_00001_flush_length");
+    Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
+      AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
     assertEquals(true, fs.exists(sideFile));
     assertEquals(24, fs.getFileStatus(sideFile).getLen());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index 22bd4b9..22030b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -97,7 +97,8 @@ public class TestOrcRecordUpdater {
         .minimumTransactionId(10)
         .maximumTransactionId(19)
         .inspector(inspector)
-        .reporter(Reporter.NULL);
+        .reporter(Reporter.NULL)
+        .finalDestination(root);
     RecordUpdater updater = new OrcRecordUpdater(root, options);
     updater.insert(11, new MyRow("first"));
     updater.insert(11, new MyRow("second"));
@@ -197,7 +198,8 @@ public class TestOrcRecordUpdater {
         .maximumTransactionId(100)
         .inspector(inspector)
         .reporter(Reporter.NULL)
-        .recordIdColumn(1);
+        .recordIdColumn(1)
+        .finalDestination(root);
     RecordUpdater updater = new OrcRecordUpdater(root, options);
     updater.update(100, new MyRow("update", 30, 10, bucket));
     updater.delete(100, new MyRow("", 60, 40, bucket));

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 671e122..21adc9d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -241,7 +241,7 @@ public abstract class CompactorTest {
     return sd;
   }
 
-  // I can't do this with @Before because I want to be able to control when the thead starts
+  // I can't do this with @Before because I want to be able to control when the thread starts
   private void startThread(char type, boolean stopAfterOne) throws Exception {
     startThread(type, stopAfterOne, new AtomicBoolean());
   }
@@ -284,7 +284,7 @@ public abstract class CompactorTest {
     switch (type) {
       case BASE: filename = "base_" + maxTxn; break;
       case LENGTH_FILE: // Fall through to delta
-      case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+      case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
       case LEGACY: break; // handled below
     }
 
@@ -508,5 +508,21 @@ public abstract class CompactorTest {
     }
   }
 
+  /**
+   * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
+   * the name was delta_xxxx_yyyy.  We want to run compaction tests such that both formats
+   * are used since new (1.3) code has to be able to read old files.
+   */
+  abstract boolean useHive130DeltaDirName();
 
+  String makeDeltaDirName(long minTxnId, long maxTxnId) {
+    return useHive130DeltaDirName() ?
+      AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+  }
+  /**
+   * delta dir name after compaction
+   */
+  String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+    return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index ffdbb9a..0db732c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest {
     boolean sawBase = false, sawDelta = false;
     for (Path p : paths) {
       if (p.getName().equals("base_20")) sawBase = true;
-      else if (p.getName().equals("delta_21_24")) sawDelta = true;
+      else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
       else Assert.fail("Unexpected file " + p.getName());
     }
     Assert.assertTrue(sawBase);
@@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest {
     boolean sawBase = false, sawDelta = false;
     for (Path path : paths) {
       if (path.getName().equals("base_20")) sawBase = true;
-      else if (path.getName().equals("delta_21_24")) sawDelta = true;
+      else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true;
       else Assert.fail("Unexpected file " + path.getName());
     }
     Assert.assertTrue(sawBase);
@@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(0, rsp.getCompactsSize());
   }
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
new file mode 100644
index 0000000..c637dd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+/**
+ * Same as TestCleaner but tests delta file names in Hive 1.3.0 format 
+ */
+public class TestCleaner2 extends TestCleaner {
+  public TestCleaner2() throws Exception {
+    super();
+  }
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 00b13de..0b0b1da 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest {
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(0, compacts.size());
   }
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
+  }
 
 }