You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ku...@apache.org on 2020/02/21 08:23:49 UTC

[hive] branch master updated: HIVE-21164: ACID: explore how we can avoid a move step during inserts/compaction (Marta Kuczora, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ffee30e  HIVE-21164: ACID: explore how we can avoid a move step during inserts/compaction (Marta Kuczora, reviewed by Peter Vary)
ffee30e is described below

commit ffee30e6267e85f00a22767262192abb9681cfb7
Author: Marta Kuczora <ku...@cloudera.com>
AuthorDate: Fri Feb 21 09:22:06 2020 +0100

    HIVE-21164: ACID: explore how we can avoid a move step during inserts/compaction (Marta Kuczora, reviewed by Peter Vary)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |    3 +
 .../hive/hcatalog/streaming/TestStreaming.java     |    2 +-
 .../org/apache/hadoop/hive/ql/TestAcidOnTez.java   |   20 +-
 .../hadoop/hive/ql/history/TestHiveHistory.java    |    2 +-
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |    3 +-
 .../test/resources/testconfiguration.properties    |    3 +
 .../hive/ql/exec/AbstractFileMergeOperator.java    |    6 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  126 +-
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |    8 +-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |   84 +-
 .../apache/hadoop/hive/ql/io/AcidInputFormat.java  |    4 +-
 .../apache/hadoop/hive/ql/io/AcidOutputFormat.java |   10 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   52 +-
 .../hadoop/hive/ql/io/HiveFileFormatUtils.java     |   14 +-
 .../apache/hadoop/hive/ql/io/RecordUpdater.java    |    6 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |    5 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java     |    5 +
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |   31 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java    |   14 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java |    4 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |   42 +-
 .../hadoop/hive/ql/optimizer/GenMapRedUtils.java   |   24 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |   91 +-
 .../hadoop/hive/ql/parse/spark/GenSparkUtils.java  |    2 +-
 .../apache/hadoop/hive/ql/plan/FileSinkDesc.java   |   28 +-
 .../apache/hadoop/hive/ql/plan/LoadTableDesc.java  |    9 +
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   44 +-
 .../apache/hadoop/hive/ql/util/UpgradeTool.java    |    1 -
 .../apache/hadoop/hive/ql/TestTxnAddPartition.java |    2 +-
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java |    2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |    4 +-
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    |   14 +-
 .../apache/hadoop/hive/ql/TestTxnConcatenate.java  |   14 +-
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java     |    2 +-
 .../org/apache/hadoop/hive/ql/TestTxnLoadData.java |   20 +-
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java    |  154 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   17 +-
 .../apache/hadoop/hive/ql/exec/TestExecDriver.java |    2 +-
 .../hadoop/hive/ql/exec/TestFileSinkOperator.java  |   10 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  |    1 +
 .../hive/ql/txn/compactor/CompactorTest.java       |    2 +-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   |    4 +-
 .../tez_acid_union_dynamic_partition.q             |   20 +
 .../tez_acid_union_dynamic_partition_2.q           |   25 +
 .../clientpositive/tez_acid_union_multiinsert.q    |   94 +
 .../results/clientpositive/acid_subquery.q.out     |    9 +
 .../create_transactional_full_acid.q.out           |    9 +
 .../encryption_insert_partition_dynamic.q.out      |    4 +
 .../clientpositive/llap/acid_no_buckets.q.out      |   56 +-
 .../clientpositive/llap/insert_overwrite.q.out     |   10 +
 .../test/results/clientpositive/llap/mm_all.q.out  |   15 +
 .../llap/tez_acid_union_dynamic_partition.q.out    |   63 +
 .../llap/tez_acid_union_dynamic_partition_2.q.out  |   85 +
 .../llap/tez_acid_union_multiinsert.q.out          | 3481 ++++++++++++++++++++
 ql/src/test/results/clientpositive/mm_all.q.out    |   15 +
 .../org/apache/hive/streaming/TestStreaming.java   |    6 +-
 56 files changed, 4479 insertions(+), 304 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c872e69..d0a552a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3996,6 +3996,9 @@ public class HiveConf extends Configuration {
     HIVE_CREATE_TABLES_AS_INSERT_ONLY("hive.create.as.insert.only", false,
         "Whether the eligible tables should be created as ACID insert-only by default. Does \n" +
         "not apply to external tables, the ones using storage handlers, etc."),
+    HIVE_ACID_DIRECT_INSERT_ENABLED("hive.acid.direct.insert.enabled", true,
+        "Enable writing the data files directly to the table's final destination instead of the staging directory."
+        + "This optimization only applies on INSERT operations on ACID tables."),
     // role names are case-insensitive
     USERS_IN_ADMIN_ROLE("hive.users.in.admin.role", "", false,
         "Comma separated list of users who are in admin role for bootstrapping.\n" +
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index da677c7..569de70 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -392,7 +392,7 @@ public class TestStreaming {
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 056cd27..01b7a36 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -681,11 +681,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
     }
 
     String[][] expected2 = {
-      {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"}
+      {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000_0"}
     };
     Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
     for(int i = 0; i < expected2.length; i++) {
@@ -727,11 +727,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
       LOG.warn(s);
     }
     String[][] expected2 = {
-      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
-      {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
-      {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
-      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}
+      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"},
+      {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"},
+      {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000_0"},
+      {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"}
     };
     Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
     for(int i = 0; i < expected2.length; i++) {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 31d15fd..127de23 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -112,7 +112,7 @@ public class TestHiveHistory {
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src,
-          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, false);
         i++;
       }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index c2aa73b..e70d878 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -346,7 +347,7 @@ class CompactorTestUtil {
     conf.setBoolean("orc.schema.evolution.case.sensitive", false);
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(conf, true, bucket, writeIdList, base, deltas);
+        aif.getRawReader(conf, true, bucket, writeIdList, base, deltas, new HashMap<String, String>());
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 94467a4..42bc5df 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -435,6 +435,8 @@ minillap.query.files=acid_bucket_pruning.q,\
   results_cache_diff_fs.q,\
   tez_union_dynamic_partition.q,\
   tez_union_dynamic_partition_2.q,\
+  tez_acid_union_dynamic_partition.q,\
+  tez_acid_union_dynamic_partition_2.q,\
   unionDistinct_1.q,\
   whroot_external1.q,\
   load_fs2.q,\
@@ -839,6 +841,7 @@ minillaplocal.query.files=\
   tez_union_decimal.q,\
   tez_union_group_by.q,\
   tez_union_multiinsert.q,\
+  tez_acid_union_multiinsert.q,\
   tez_vector_dynpart_hashjoin_1.q,\
   tez_vector_dynpart_hashjoin_2.q,\
   timestamp_4.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 9a32581..d68d8f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -257,7 +257,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
           assert finalPath.equals(outPath);
           // There's always just one file that we have merged.
           // The union/DP/etc. should already be account for in the path.
-          Utilities.writeMmCommitManifest(Lists.newArrayList(outPath),
+          Utilities.writeCommitManifest(Lists.newArrayList(outPath),
               tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false);
           LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes).");
         }
@@ -337,8 +337,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
             lbLevels = conf.getListBucketingDepth();
         // We don't expect missing buckets from mere (actually there should be no buckets),
         // so just pass null as bucketing context. Union suffix should also be accounted for.
-        Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success,
-            dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false);
+        Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success,
+            dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false);
       }
 
     } catch (IOException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 9ad4e71..d5e1b5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -161,6 +161,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     private final String subdirForTxn;
     private Path taskOutputTempPathRoot;
     Path[] outPaths;
+    // The bucket files we successfully wrote to in this writer
+    Path[] outPathsCommitted;
     Path[] finalPaths;
     RecordWriter[] outWriters;
     RecordUpdater[] updaters;
@@ -168,19 +170,31 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     int acidLastBucket = -1;
     int acidFileOffset = -1;
     private boolean isMmTable;
+    private boolean isDirectInsert;
+    private boolean isInsertOverwrite;
     String dpDirForCounters;
 
-    public FSPaths(Path specPath, boolean isMmTable) {
+    public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert, boolean isInsertOverwrite) {
       this.isMmTable = isMmTable;
-      if (!isMmTable) {
+      this.isDirectInsert = isDirectInsert;
+      this.isInsertOverwrite = isInsertOverwrite;
+      if (!isMmTable && !isDirectInsert) {
         tmpPathRoot = Utilities.toTempPath(specPath);
         taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath);
         subdirForTxn = null;
       } else {
         tmpPathRoot = specPath;
         taskOutputTempPathRoot = null; // Should not be used.
-        subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
-            conf.getTableWriteId(), conf.getTableWriteId(),  conf.getStatementId());
+        if (isMmTable) {
+          subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
+              conf.getTableWriteId(), conf.getTableWriteId(),  conf.getStatementId());
+        } else {
+          /**
+           * For direct write to final path during ACID insert, we create the delta directories
+           * later when we create the RecordUpdater using AcidOutputFormat.Options
+           */
+          subdirForTxn = null;
+        }
       }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles
@@ -188,6 +202,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
 
       outPaths = new Path[numFiles];
+      outPathsCommitted = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
       updaters = new RecordUpdater[numFiles];
@@ -211,6 +226,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       try {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
+            SerDeStats stats = updaters[i].getStats();
+            // Ignore 0 row files except in case of insert overwrite
+            if (isDirectInsert && (stats.getRowCount() > 0 || isInsertOverwrite)) {
+              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            }
             updaters[i].close(abort);
           }
         }
@@ -243,13 +263,19 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
       if(outPaths[idx] != null && fs.exists(outPaths[idx])) {
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-          Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to "
-              + finalPaths[idx] + " (" + isMmTable + ")");
+          Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (mm table ="
+              + isMmTable + ", direct insert = " + isDirectInsert + ")");
         }
         if (isMmTable) {
           assert outPaths[idx].equals(finalPaths[idx]);
           commitPaths.add(outPaths[idx]);
-        } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+        } else if (isDirectInsert && (outPathsCommitted[idx] != null)) {
+          if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
+            Utilities.FILE_OP_LOGGER
+                .trace("committing " + outPathsCommitted[idx] + " (direct insert = " + isDirectInsert + ")");
+          }
+          commitPaths.add(outPathsCommitted[idx]);
+        } else if (!isDirectInsert && !fs.rename(outPaths[idx], finalPaths[idx])) {
             FileStatus fileStatus = FileUtils.getFileStatusOrNull(fs, finalPaths[idx]);
             if (fileStatus != null) {
               LOG.warn("Target path " + finalPaths[idx] + " with a size " + fileStatus.getLen() + " exists. Trying to delete it.");
@@ -264,15 +290,14 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
             }
         }
       }
-
       updateProgress();
     }
 
-    public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
-      //should this close updaters[]?
+    public void abortWritersAndUpdaters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
+            LOG.debug("Aborted: closing: " + outWriters[idx].toString());
             outWriters[idx].close(abort);
             if (delete) {
               fs.delete(outPaths[idx], true);
@@ -283,6 +308,20 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
         }
       }
+      for (int idx = 0; idx < updaters.length; idx++) {
+        if (updaters[idx] != null) {
+          try {
+            LOG.debug("Aborted: closing: " + updaters[idx].toString());
+            updaters[idx].close(abort);
+            if (delete) {
+              fs.delete(outPaths[idx], true);
+            }
+            updateProgress();
+          } catch (IOException e) {
+            throw new HiveException(e);
+          }
+        }
+      }
     }
 
     public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable,
@@ -290,7 +329,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       if (isNativeTable) {
         String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
         String taskWithExt = extension == null ? taskId : taskId + extension;
-        if (!isMmTable) {
+        if (!isMmTable && !isDirectInsert) {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
             finalPaths[filesIdx] = new Path(parent, taskWithExt);
           } else {
@@ -308,7 +347,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
             taskIdPath += extension;
           }
 
-          Path finalPath = new Path(buildTmpPath(), taskIdPath);
+          Path finalPath;
+          if (isDirectInsert) {
+            finalPath = buildTmpPath();
+          } else {
+            finalPath = new Path(buildTmpPath(), taskIdPath);
+          }
 
           // In the cases that have multi-stage insert, e.g. a "hive.skewjoin.key"-based skew join,
           // it can happen that we want multiple commits into the same directory from different
@@ -319,7 +363,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           // affects some less obscure scenario.
           try {
             FileSystem fpfs = finalPath.getFileSystem(hconf);
-            if (fpfs.exists(finalPath)) {
+            if ((!isDirectInsert) && fpfs.exists(finalPath)) {
               throw new RuntimeException(finalPath + " already exists");
             }
           } catch (IOException e) {
@@ -330,7 +374,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
         if (LOG.isInfoEnabled()) {
           LOG.info("Final Path: FS " + finalPaths[filesIdx]);
-          if (LOG.isInfoEnabled() && !isMmTable) {
+          if (LOG.isInfoEnabled() && (!isMmTable && !isDirectInsert)) {
             LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
           }
         }
@@ -468,7 +512,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       unionPath = null;
     } else {
       isUnionDp = (dpCtx != null);
-      if (conf.isMmTable() || isUnionDp) {
+      if (conf.isDirectInsert()) {
+        specPath = conf.getParentDir();
+        unionPath = null;
+      } else if (conf.isMmTable() || isUnionDp) {
         // MM tables need custom handling for union suffix; DP tables use parent too.
         specPath = conf.getParentDir();
         unionPath = conf.getDirName().getName();
@@ -526,7 +573,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         throw ex;
       }
       isCompressed = conf.getCompressed();
-      parent = Utilities.toTempPath(conf.getDirName());
+      if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
+        parent = Utilities.toTempPath(conf.getFinalDirName());
+      } else {
+        parent = Utilities.toTempPath(conf.getDirName());
+      }
       statsFromRecordWriter = new boolean[numFiles];
       serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
       serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties());
@@ -565,7 +616,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
 
       if (!bDynParts) {
-        fsp = new FSPaths(specPath, conf.isMmTable());
+        fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
         fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath);
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp)
@@ -740,7 +791,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       assert filesIdx == numFiles;
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
-      if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable()) {
+      if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable() && !conf.isDirectInsert()) {
         autoDelete = fs.deleteOnExit(fsp.outPaths[0]);
       }
     } catch (Exception e) {
@@ -764,7 +815,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
       }
 
-      if (isNativeTable() && !conf.isMmTable()) {
+      if (isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()) {
         // in recent hadoop versions, use deleteOnExit to clean tmp files.
         autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
       }
@@ -790,12 +841,21 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
             StatsProvidingRecordWriter;
         // increment the CREATED_FILES counter
       } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+        Path outPath = fsp.outPaths[filesIdx];
+        if (conf.isDirectInsert()
+            && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
+          LOG.warn("Unable to create directory: " + outPath);
+        }
         // Only set up the updater for insert.  For update and delete we don't know unitl we see
         // the row.
         ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
         int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+        String attemptId = null;
+        if (conf.isDirectInsert()) {
+          attemptId = taskId.split("_")[1];
+        }
         fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
-            acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+            acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1, attemptId); // outPath.getParent()
       }
 
       if (reporter != null) {
@@ -824,9 +884,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       // <table-dir>/<staging-dir>/<partition-dir>/<union-dir>
 
       // for non-MM tables, the final destination partition directory is created during move task via rename
-      // for MM tables, the final destination partition directory is created by the tasks themselves
+      // for MM tables and ACID insert, the final destination partition directory is created by the tasks themselves
       try {
-        if (conf.isMmTable()) {
+        if (conf.isMmTable() || conf.isDirectInsert()) {
           createDpDir(destPartPath);
         } else {
           // outPath will be
@@ -1086,7 +1146,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
    * @throws HiveException
    */
   private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException {
-    FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable());
+    FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
     fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath);
     fsp2.subdirBeforeTxn = dpDir;
     String pathKey = combinePathFragments(dpDir, lbDir);
@@ -1337,9 +1397,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths);
         }
       }
-      if (conf.isMmTable()) {
-        Utilities.writeMmCommitManifest(commitPaths, specPath, fs, originalTaskId,
-                conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite());
+      if (conf.isMmTable() || conf.isDirectInsert()) {
+        Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf
+            .getStatementId(), unionPath, conf.getInsertOverwrite());
       }
       // Only publish stats if this operator's flag was set to gather stats
       if (conf.isGatherStats()) {
@@ -1350,7 +1410,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       // Hadoop always call close() even if an Exception was thrown in map() or
       // reduce().
       for (FSPaths fsp : valToPaths.values()) {
-        fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable());
+        fsp.abortWritersAndUpdaters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable());
       }
     }
     fsp = prevFsp = null;
@@ -1383,10 +1443,14 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           specPath = conf.getParentDir();
           unionSuffix = conf.getDirName().getName();
         }
+        if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
+          specPath = conf.getParentDir();
+          unionSuffix = null;
+        }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath);
         }
-        if (!conf.isMmTable()) {
+        if (!conf.isMmTable() && !conf.isDirectInsert()) {
           Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter);
         } else {
           int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
@@ -1396,9 +1460,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
               : (dpCtx != null ? dpCtx.getNumBuckets() : 0);
           MissingBucketsContext mbc = new MissingBucketsContext(
               conf.getTableInfo(), numBuckets, conf.getCompressed());
-          Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
-              dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter,
-              conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite());
+          Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc,
+              conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf
+                  .getInsertOverwrite(), conf.isDirectInsert());
         }
       }
     } catch (IOException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 06e4ebe..51de87f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -419,7 +419,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
                   work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp,
                   resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(),
-                  tbd.isInsertOverwrite());
+                  tbd.isInsertOverwrite(), tbd.isDirectInsert());
           if (work.getOutputs() != null) {
             DDLUtils.addIfAbsentByName(new WriteEntity(table,
               getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
@@ -521,7 +521,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
                     !tbd.isMmTable(),
             resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(),
-            tbd.isInsertOverwrite());
+            tbd.isInsertOverwrite(), tbd.isDirectInsert());
     Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 
     // See the comment inside updatePartitionBucketSortColumns.
@@ -568,7 +568,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         tbd.getStmtId(),
         resetStatisticsProps(table),
         work.getLoadTableWork().getWriteType(),
-        tbd.isInsertOverwrite());
+        tbd.isInsertOverwrite(),
+        tbd.isDirectInsert()
+        );
 
     // publish DP columns to its subscribers
     if (dps != null && dps.size() > 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 6c67bc7..e9966e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1963,9 +1963,9 @@ public final class Utilities {
     // This can happen in ACID cases when we have splits on delta files, where the filenames
     // are of the form delta_x_y/bucket_a.
     if (bucketName.startsWith(AcidUtils.BUCKET_PREFIX)) {
-      m = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(bucketName);
+      m = AcidUtils.BUCKET_PATTERN.matcher(bucketName);
       if (m.find()) {
-          return Integer.parseInt(m.group());
+        return Integer.parseInt(m.group(1));
       }
       // Note that legacy bucket digit pattern are being ignored here.
     }
@@ -3694,8 +3694,9 @@ public final class Utilities {
 
       if (op instanceof FileSinkOperator) {
         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
-        if (fdesc.isMmTable()) {
-          continue; // No need to create for MM tables
+        if (fdesc.isMmTable() || fdesc.isDirectInsert()) {
+          // No need to create for MM tables, or ACID insert
+          continue;
         }
         Path tempDir = fdesc.getDirName();
         if (tempDir != null) {
@@ -4108,7 +4109,7 @@ public final class Utilities {
     }
   }
 
-  public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
+  public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
       PathFilter filter, long writeId, int stmtId, Configuration conf,
       Boolean isBaseDir) throws IOException {
     int skipLevels = dpLevels;
@@ -4124,9 +4125,9 @@ public final class Utilities {
     //       /want/ to know isBaseDir because that is error prone; so, it ends up never being used.
     if (stmtId < 0 || isBaseDir == null
         || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) {
-      return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
+      return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
     }
-    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir);
+    return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir);
   }
 
   private static boolean isS3(FileSystem fs) {
@@ -4149,7 +4150,7 @@ public final class Utilities {
     return paths;
   }
 
-  private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs,
+  private static Path[] getDirectInsertDirectoryCandidatesRecursive(FileSystem fs,
       Path path, int skipLevels, PathFilter filter) throws IOException {
     String lastRelDir = null;
     HashSet<Path> results = new HashSet<Path>();
@@ -4202,7 +4203,7 @@ public final class Utilities {
     return results.toArray(new Path[results.size()]);
   }
 
-  private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels,
+  private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels,
       PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException {
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
     for (int i = 0; i < skipLevels; i++) {
@@ -4219,10 +4220,10 @@ public final class Utilities {
     return statusToPath(fs.globStatus(pathPattern, filter));
   }
 
-  private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
+  private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir,
       int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId,
       Configuration conf) throws IOException {
-    Path[] files = getMmDirectoryCandidates(
+    Path[] files = getDirectInsertDirectoryCandidates(
         fs, specPath, dpLevels, filter, writeId, stmtId, conf, null);
     if (files != null) {
       for (Path path : files) {
@@ -4235,7 +4236,7 @@ public final class Utilities {
   }
 
 
-  public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
+  public static void writeCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
       String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException {
     if (commitPaths.isEmpty()) {
       return;
@@ -4280,15 +4281,15 @@ public final class Utilities {
     }
   }
 
-  public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
+  public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
       boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId,
-      Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite)
-          throws IOException, HiveException {
+      Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert)
+      throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
     Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite);
     if (!success) {
       AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId);
-      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
+      tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
           filter, writeId, stmtId, hconf);
       return;
     }
@@ -4317,13 +4318,13 @@ public final class Utilities {
       Utilities.FILE_OP_LOGGER.info("Creating directory with no output at {}", specPath);
       FileUtils.mkdir(fs, specPath, hconf);
     }
-    Path[] files = getMmDirectoryCandidates(
+    Path[] files = getDirectInsertDirectoryCandidates(
         fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite);
-    ArrayList<Path> mmDirectories = new ArrayList<>();
+    ArrayList<Path> directInsertDirectories = new ArrayList<>();
     if (files != null) {
       for (Path path : files) {
         Utilities.FILE_OP_LOGGER.trace("Looking at path: {}", path);
-        mmDirectories.add(path);
+        directInsertDirectories.add(path);
       }
     }
 
@@ -4356,15 +4357,15 @@ public final class Utilities {
       }
     }
 
-    for (Path path : mmDirectories) {
-      cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed);
+    for (Path path : directInsertDirectories) {
+      cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed);
     }
 
     if (!committed.isEmpty()) {
       throw new HiveException("The following files were committed but not found: " + committed);
     }
 
-    if (mmDirectories.isEmpty()) {
+    if (directInsertDirectories.isEmpty()) {
       return;
     }
 
@@ -4373,19 +4374,22 @@ public final class Utilities {
     if (lbLevels != 0) {
       return;
     }
-    // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
-    // doesn't need tocheck anything except path and directory status for MM directories.
-    FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
-    for (int i = 0; i < mmDirectories.size(); ++i) {
-      finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i));
-    }
-    List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults,
-        unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId,
+
+    if (!isDirectInsert) {
+      // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
+      // doesn't need to check anything except path and directory status for MM directories.
+      FileStatus[] finalResults = new FileStatus[directInsertDirectories.size()];
+      for (int i = 0; i < directInsertDirectories.size(); ++i) {
+        finalResults[i] = new PathOnlyFileStatus(directInsertDirectories.get(i));
+      }
+      List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults,
+          unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId,
             isMmTable, null, isInsertOverwrite);
-    // create empty buckets if necessary
-    if (!emptyBuckets.isEmpty()) {
-      assert mbc != null;
-      Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter);
+      // create empty buckets if necessary
+      if (!emptyBuckets.isEmpty()) {
+        assert mbc != null;
+        Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter);
+      }
     }
   }
 
@@ -4395,7 +4399,7 @@ public final class Utilities {
     }
   }
 
-  private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix,
+  private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix,
       int lbLevels, HashSet<Path> committed) throws IOException, HiveException {
     for (FileStatus child : fs.listStatus(dir)) {
       Path childPath = child.getPath();
@@ -4406,7 +4410,7 @@ public final class Utilities {
         if (child.isDirectory()) {
           Utilities.FILE_OP_LOGGER.trace(
               "Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1);
-          cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed);
+          cleanDirectInsertDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed);
         } else {
           if (committed.contains(childPath)) {
             throw new HiveException("LB FSOP has commited "
@@ -4421,7 +4425,9 @@ public final class Utilities {
         if (committed.remove(childPath)) {
           continue; // A good file.
         }
-        deleteUncommitedFile(childPath, fs);
+        if (!childPath.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) {
+          deleteUncommitedFile(childPath, fs);
+        }
       } else if (!child.isDirectory()) {
         if (committed.contains(childPath)) {
           throw new HiveException("Union FSOP has commited "
@@ -4429,8 +4435,8 @@ public final class Utilities {
         }
         deleteUncommitedFile(childPath, fs);
       } else if (childPath.getName().equals(unionSuffix)) {
-        // Found the right union directory; treat it as "our" MM directory.
-        cleanMmDirectory(childPath, fs, null, 0, committed);
+        // Found the right union directory; treat it as "our" directory.
+        cleanDirectInsertDirectory(childPath, fs, null, 0, committed);
       } else {
         String childName = childPath.getName();
         if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index bba3960..034d2d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -34,6 +34,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * The interface required for input formats that what to support ACID
@@ -258,7 +259,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
                              int bucket,
                              ValidWriteIdList validWriteIdList,
                              Path baseDirectory,
-                             Path[] deltaDirectory
+                             Path[] deltaDirectory,
+                             Map<String, String> deltasToAttemptId
                              ) throws IOException;
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 1e8bb22..a36630a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -51,6 +51,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private Reporter reporter;
     private long minimumWriteId;
     private long maximumWriteId;
+    private String attemptId;
     /**
      * actual bucketId (as opposed to bucket property via BucketCodec)
      */
@@ -240,6 +241,11 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return this;
     }
 
+    public Options attemptId(String attemptId) {
+      this.attemptId = attemptId;
+      return this;
+    }
+
     /**
      * @since 1.3.0
      * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
@@ -313,6 +319,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return bucketId;
     }
 
+    public String getAttemptId() {
+      return attemptId;
+    }
+
     public int getRecordIdColumn() {
       return recIdCol;
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2f5ec52..5d57509 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Strings;
@@ -160,8 +161,8 @@ public class AcidUtils {
    * This must be in sync with {@link #STATEMENT_DIGITS}
    */
   public static final int MAX_STATEMENTS_PER_TXN = 10000;
-  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
-  public static final Pattern   LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+  public static final Pattern BUCKET_PATTERN = Pattern.compile("bucket_([0-9]+)(_[0-9]+)?$");
 
   /**
    * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
@@ -180,7 +181,6 @@ public class AcidUtils {
   }
   private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
 
-  public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
   public static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
   /**
@@ -241,7 +241,11 @@ public class AcidUtils {
    * @return the filename
    */
   public static Path createBucketFile(Path subdir, int bucket) {
-    return createBucketFile(subdir, bucket, true);
+    return createBucketFile(subdir, bucket, null, true);
+  }
+
+  public static Path createBucketFile(Path subdir, int bucket, String attemptId) {
+    return createBucketFile(subdir, bucket, attemptId, true);
   }
 
   /**
@@ -250,10 +254,13 @@ public class AcidUtils {
    * @param bucket the bucket number
    * @return the filename
    */
-  private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+  private static Path createBucketFile(Path subdir, int bucket, String attemptId, boolean isAcidSchema) {
     if(isAcidSchema) {
-      return new Path(subdir,
-        BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+      String fileName = BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket);
+      if (attemptId != null) {
+        fileName = fileName + "_" + attemptId;
+      }
+      return new Path(subdir, fileName);
     }
     else {
       return new Path(subdir,
@@ -353,7 +360,7 @@ public class AcidUtils {
       return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS,
           options.getBucketId()) + "_0");
     } else {
-      return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId());
+      return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId(), options.getAttemptId());
     }
   }
 
@@ -421,11 +428,32 @@ public class AcidUtils {
     if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
       return Integer.parseInt(filename.substring(0, filename.indexOf('_')));
     } else if (filename.startsWith(BUCKET_PREFIX)) {
-      return Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
+      Matcher matcher = BUCKET_PATTERN.matcher(filename);
+      if (matcher.matches()) {
+        String bucketId = matcher.group(1);
+        filename = filename.substring(0,matcher.end(1));
+        if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
+          Utilities.FILE_OP_LOGGER.debug("Parsing bucket ID = " + bucketId + " from file name '" + filename + "'");
+        }
+        return Integer.parseInt(bucketId);
+      }
     }
     return -1;
   }
 
+  public static String parseAttemptId(Path bucketFile) {
+    String filename = bucketFile.getName();
+    Matcher matcher = BUCKET_PATTERN.matcher(filename);
+    String attemptId = null;
+    if (matcher.matches()) {
+      attemptId = matcher.group(2) != null ? matcher.group(2).substring(1) : null;
+    }
+    if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
+      Utilities.FILE_OP_LOGGER.debug("Parsing attempt ID = " + attemptId + " from file name '" + bucketFile + "'");
+    }
+    return attemptId;
+  }
+
   /**
    * Read the first row of an ORC file and determine the bucket ID based on the bucket column. This only works with
    * files with ACID schema.
@@ -460,6 +488,7 @@ public class AcidUtils {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
     int bucket = parseBucketId(bucketFile);
+    String attemptId = parseAttemptId(bucketFile);
     if (ORIGINAL_PATTERN.matcher(filename).matches()) {
       result
           .setOldStyle(true)
@@ -494,7 +523,8 @@ public class AcidUtils {
             .setOldStyle(false)
             .minimumWriteId(parsedDelta.minWriteId)
             .maximumWriteId(parsedDelta.maxWriteId)
-            .bucket(bucket);
+            .bucket(bucket)
+            .attemptId(attemptId);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
         ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
           bucketFile.getFileSystem(conf), null);
@@ -2559,7 +2589,7 @@ public class AcidUtils {
    */
   public static final class OrcAcidVersion {
     private static final String ACID_VERSION_KEY = "hive.acid.version";
-    private static final String ACID_FORMAT = "_orc_acid_version";
+    public static final String ACID_FORMAT = "_orc_acid_version";
     private static final Charset UTF8 = Charset.forName("UTF-8");
     public static final int ORC_ACID_VERSION_DEFAULT = 0;
     /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 8980a62..22099e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -306,10 +306,15 @@ public final class HiveFileFormatUtils {
     return (HiveOutputFormat<?, ?>) outputFormat;
   }
 
+  public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, FileSinkDesc conf,
+      Path outPath, ObjectInspector inspector, Reporter reporter, int rowIdColNum) throws HiveException, IOException {
+    return getAcidRecordUpdater(jc, tableInfo, bucket, conf, outPath, inspector, reporter, rowIdColNum, null);
+  }
+
   public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket,
                                                    FileSinkDesc conf, Path outPath,
                                                    ObjectInspector inspector,
-                                                   Reporter reporter, int rowIdColNum)
+                                                   Reporter reporter, int rowIdColNum, String attemptId)
       throws HiveException, IOException {
     HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
     AcidOutputFormat<?, ?> acidOutputFormat = null;
@@ -323,10 +328,9 @@ public final class HiveFileFormatUtils {
     // file the way getHiveRecordWriter does, as ORC appears to read the value for itself.  Not
     // sure if this is correct or not.
     return getRecordUpdater(jc, acidOutputFormat,
-        bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
+        bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf, attemptId);
   }
 
-
   private static RecordUpdater getRecordUpdater(JobConf jc,
                                                 AcidOutputFormat<?, ?> acidOutputFormat,
                                                 int bucket,
@@ -335,7 +339,8 @@ public final class HiveFileFormatUtils {
                                                 Path outPath,
                                                 Reporter reporter,
                                                 int rowIdColNum,
-                                                FileSinkDesc conf) throws IOException {
+                                                FileSinkDesc conf,
+                                                String attemptId) throws IOException {
     return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
         .isCompressed(conf.getCompressed())
         .tableProperties(tableProp)
@@ -348,6 +353,7 @@ public final class HiveFileFormatUtils {
         .recordIdColumn(rowIdColNum)
         .statementId(conf.getStatementId())
         .finalDestination(conf.getDestPath())
+        .attemptId(attemptId)
         .temporary(conf.isTemporary()));
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
index 737e677..bb257d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 
 /**
@@ -79,4 +80,9 @@ public interface RecordUpdater {
    * @return - buffered row count
    */
   long getBufferedRowCount();
+
+  /**
+   * Returns the path of the file this updater wrote to
+   */
+  public Path getUpdatedFilePath();
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index a069032..03f7086 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2459,7 +2459,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                            int bucket,
                                            ValidWriteIdList validWriteIdList,
                                            Path baseDirectory,
-                                           Path[] deltaDirectory
+                                           Path[] deltaDirectory,
+                                           Map<String, String> deltasToAttemptId
                                            ) throws IOException {
     boolean isOriginal = false;
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
@@ -2481,7 +2482,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       mergerOptions.rootPath(deltaDirectory[0].getParent());
     }
     return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
-        bucket, validWriteIdList, new Reader.Options(conf), deltaDirectory, mergerOptions);
+        bucket, validWriteIdList, new Reader.Options(conf), deltaDirectory, mergerOptions, deltasToAttemptId);
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index c4c56f8..202f78b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -268,6 +268,11 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
       stringifyObject(buffer, obj, inspector);
       return buffer.toString();
     }
+
+    @Override
+    public Path getUpdatedFilePath() {
+      return null;
+    }
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index b8a0f04..f543418 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -928,6 +927,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
     }
   }
+
+  OrcRawRecordMerger(Configuration conf,
+      boolean collapseEvents,
+      Reader reader,
+      boolean isOriginal,
+      int bucket,
+      ValidWriteIdList validWriteIdList,
+      Reader.Options options,
+      Path[] deltaDirectory,
+      Options mergerOptions) throws IOException {
+    this(conf, collapseEvents, reader, isOriginal, bucket, validWriteIdList, options, deltaDirectory, mergerOptions,
+        null);
+  }
+
   /**
    * Create a reader that merge sorts the ACID events together.  This handles
    * 1. 'normal' reads on behalf of a query (non vectorized)
@@ -952,7 +965,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                      int bucket,
                      ValidWriteIdList validWriteIdList,
                      Reader.Options options,
-                     Path[] deltaDirectory, Options mergerOptions) throws IOException {
+                     Path[] deltaDirectory,
+                     Options mergerOptions,
+                     Map<String, String> deltasToAttemptId) throws IOException {
     this.collapse = collapseEvents;
     this.offset = options.getOffset();
     this.length = options.getLength();
@@ -1126,7 +1141,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           }
           continue;
         }
-        for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions)) {
+
+        String attemptId = null;
+        if (deltasToAttemptId != null) {
+          attemptId = deltasToAttemptId.get(delta.toString());
+        }
+
+        for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions, attemptId)) {
           FileSystem fs = deltaFile.getFileSystem(conf);
           if(!fs.exists(deltaFile)) {
             /**
@@ -1264,12 +1285,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
    * For unbucketed tables {@code bucket} can be thought of as a write tranche.
    */
-  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions) {
+  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions, String attemptId) {
     assert (!mergerOptions.isCompacting &&
         deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
     ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory +
         "(isCompacting=" + mergerOptions.isCompacting() + ")";
-    return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket)};
+    return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket, attemptId)};
   }
   
   @VisibleForTesting
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 398698e..2d6a771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -621,7 +621,14 @@ public class OrcRecordUpdater implements RecordUpdater {
     if (writer == null) {
       writer = OrcFile.createWriter(path, writerOptions);
       AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
-      AcidUtils.OrcAcidVersion.writeVersionFile(path.getParent(), fs);
+      try {
+        AcidUtils.OrcAcidVersion.writeVersionFile(path.getParent(), fs);
+      } catch (Exception e) {
+        e.printStackTrace();
+        // Ignore; might have been created by another concurrent writer, writing to a different bucket
+        // within this delta/base directory
+        LOG.trace(e.fillInStackTrace().toString());
+      }
     }
   }
 
@@ -806,4 +813,9 @@ public class OrcRecordUpdater implements RecordUpdater {
     bucket.set(bucketProperty);
     return currentBucketProperty;
   }
+
+  @Override
+  public Path getUpdatedFilePath() {
+    return path;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 0361872..598220b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -1134,7 +1134,7 @@ public class VectorizedOrcAcidRowBatchReader
         assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
         this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
                                                     validWriteIdList, readerOptions, deleteDeltas,
-                                                    mergerOptions);
+                                                    mergerOptions, null);
         this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
         this.deleteRecordValue = this.deleteRecords.createValue();
         // Initialize the first value in the delete reader.
@@ -1565,7 +1565,7 @@ public class VectorizedOrcAcidRowBatchReader
           for (Path deleteDeltaDir : deleteDeltaDirs) {
             FileSystem fs = deleteDeltaDir.getFileSystem(conf);
             Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false));
+                new OrcRawRecordMerger.Options().isCompacting(false), null);
             for (Path deleteDeltaFile : deleteDeltaFiles) {
               // NOTE: Calling last flush length below is more for future-proofing when we have
               // streaming deletes. But currently we don't support streaming deletes, and this can
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1eb9c12..d40a67f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2211,7 +2211,7 @@ public class Hive {
                                  boolean isSkewedStoreAsSubdir,
                                  boolean isSrcLocal, boolean isAcidIUDoperation,
                                  boolean resetStatistics, Long writeId,
-                                 int stmtId, boolean isInsertOverwrite) throws HiveException {
+                                 int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
 
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
@@ -2228,7 +2228,7 @@ public class Hive {
     Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
             loadFileType, inheritTableSpecs,
             inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
-            resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+            resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert);
 
     AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
     if (tableSnapshot != null) {
@@ -2299,7 +2299,7 @@ public class Hive {
                         boolean inheritLocation, boolean isSkewedStoreAsSubdir,
                         boolean isSrcLocal, boolean isAcidIUDoperation, boolean resetStatistics,
                         Long writeId, int stmtId, boolean isInsertOverwrite,
-                        boolean isTxnTable, List<Path> newFiles) throws HiveException {
+                        boolean isTxnTable, List<Path> newFiles, boolean isDirectInsert) throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2346,15 +2346,18 @@ public class Hive {
       //       to ACID updates. So the are not themselves ACID.
 
       // Note: this assumes both paths are qualified; which they are, currently.
-      if (((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) ||
+      if (((isMmTableWrite || isDirectInsert || isFullAcidTable) && loadPath.equals(newPartPath)) ||
               (loadFileType == LoadFileType.IGNORE)) {
-        // MM insert query, move itself is a no-op.
+        // MM insert query or direct insert; move itself is a no-op.
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-          Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
+          Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM = " + isMmTableWrite
+              + ", Direct insert = " + isDirectInsert + ")");
         }
-        assert !isAcidIUDoperation;
         if (newFiles != null) {
-          listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTableWrite ? isInsertOverwrite : false, newFiles);
+          if (!isMmTableWrite && !isDirectInsert) {
+            isInsertOverwrite = false;
+          }
+          listFilesCreatedByQuery(loadPath, writeId, stmtId, isInsertOverwrite, newFiles);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
@@ -2364,7 +2367,6 @@ public class Hive {
         // Either a non-MM query, or a load into MM table from an external source.
         Path destPath = newPartPath;
         if (isMmTableWrite) {
-          assert !isAcidIUDoperation;
           // We will load into MM directory, and hide previous directories if needed.
           destPath = new Path(destPath, isInsertOverwrite
               ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));
@@ -2783,11 +2785,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   private Set<Path> getValidPartitionsInPath(
       int numDP, int numLB, Path loadPath, Long writeId, int stmtId,
-      boolean isMmTable, boolean isInsertOverwrite) throws HiveException {
+      boolean isMmTable, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
     Set<Path> validPartitions = new HashSet<Path>();
     try {
       FileSystem fs = loadPath.getFileSystem(conf);
-      if (!isMmTable) {
+      if (!isMmTable || !isDirectInsert) {
         List<FileStatus> leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
         // Check for empty partitions
         for (FileStatus s : leafStatus) {
@@ -2805,7 +2807,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         //       we have multiple statements anyway is union.
         Utilities.FILE_OP_LOGGER.trace(
             "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP);
-        Path[] leafStatus = Utilities.getMmDirectoryCandidates(
+        Path[] leafStatus = Utilities.getDirectInsertDirectoryCandidates(
             fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite);
         for (Path p : leafStatus) {
           Path dpPath = p.getParent(); // Skip the MM directory that we have found.
@@ -2853,7 +2855,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       final String tableName, final Map<String, String> partSpec, final LoadFileType loadFileType,
       final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId,
       final boolean resetStatistics, final AcidUtils.Operation operation,
-      boolean isInsertOverwrite) throws HiveException {
+      boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
 
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
@@ -2861,7 +2863,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Get all valid partition paths and existing partitions for them (if any)
     final Table tbl = getTable(tableName);
     final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
-        AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
+        AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite, isDirectInsert);
 
     final int partsToLoad = validPartitions.size();
     final AtomicInteger partitionsLoaded = new AtomicInteger(0);
@@ -2929,7 +2931,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
           // load the partition
           Partition partition = loadPartitionInternal(entry.getKey(), tbl,
                   fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
-                  resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+                  resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert);
           // if the partition already existed before the loading, no need to add it again to the
           // metastore
 
@@ -3081,7 +3083,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean resetStatistics,
-      Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {
+      Long writeId, int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
 
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE);
@@ -3098,7 +3100,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
 
     // Note: this assumes both paths are qualified; which they are, currently.
-    if (((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) {
+    if (((isMmTable || isDirectInsert || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) {
       /**
        * some operations on Transactional tables (e.g. Import) write directly to the final location
        * and avoid the 'move' operation.  Since MoveTask does other things, setting 'loadPath' to be
@@ -3111,14 +3113,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       //new files list is required only for event notification.
       if (newFiles != null) {
-        listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTable ? isInsertOverwrite : false, newFiles);
+        if (!isMmTable && !isDirectInsert) {
+          isInsertOverwrite = false;
+        }
+        listFilesCreatedByQuery(loadPath, writeId, stmtId, isInsertOverwrite, newFiles);
       }
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
       Path tblPath = tbl.getPath();
       Path destPath = tblPath;
       if (isMmTable) {
-        assert !isAcidIUDoperation;
         // We will load into MM directory, and hide previous directories if needed.
         destPath = new Path(destPath, isInsertOverwrite
             ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 73ca658..1ea3bd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1382,7 +1382,7 @@ public final class GenMapRedUtils {
     Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName;
 
     Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
-        mvTasks, fsopPath, fsInputDesc.isMmTable());
+        mvTasks, fsopPath, fsInputDesc.isMmTable(), fsInputDesc.isDirectInsert());
     ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
         fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState);
 
@@ -1869,8 +1869,8 @@ public final class GenMapRedUtils {
         .isSkewedStoredAsDir();
   }
 
-  public static Task<MoveWork> findMoveTaskForFsopOutput(
-      List<Task<MoveWork>> mvTasks, Path fsopFinalDir, boolean isMmFsop) {
+  public static Task<MoveWork> findMoveTaskForFsopOutput(List<Task<MoveWork>> mvTasks, Path fsopFinalDir,
+      boolean isMmFsop, boolean isDirectInsert) {
     // find the move task
     for (Task<MoveWork> mvTsk : mvTasks) {
       MoveWork mvWork = mvTsk.getWork();
@@ -1879,7 +1879,7 @@ public final class GenMapRedUtils {
       if (mvWork.getLoadFileWork() != null) {
         srcDir = mvWork.getLoadFileWork().getSourcePath();
         isLfd = true;
-        if (isMmFsop) {
+        if (isMmFsop || isDirectInsert) {
           srcDir = srcDir.getParent();
         }
       } else if (mvWork.getLoadTableWork() != null) {
@@ -1910,8 +1910,8 @@ public final class GenMapRedUtils {
 
     // no need of merging if the move is to a local file system
     // We are looking based on the original FSOP, so use the original path as is.
-    MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(
-        mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable());
+    MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(),
+        fsOp.getConf().isMmTable(), fsOp.getConf().isDirectInsert());
 
     // TODO: wtf?!! why is this in this method? This has nothing to do with anything.
     if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
@@ -1985,9 +1985,15 @@ public final class GenMapRedUtils {
 
     FileSinkDesc fileSinkDesc = fsOp.getConf();
     boolean isMmTable = fileSinkDesc.isMmTable();
+    boolean isDirectInsert = fileSinkDesc.isDirectInsert();
     if (chDir) {
       dest = fileSinkDesc.getMergeInputDirName();
-      if (!isMmTable) {
+      /**
+       * Skip temporary file generation for:
+       * 1. MM Tables
+       * 2. INSERT operation on full ACID table
+       */
+      if ((!isMmTable) && (!isDirectInsert)) {
         // generate the temporary file
         // it must be on the same file system as the current destination
         Context baseCtx = parseCtx.getContext();
@@ -2018,8 +2024,8 @@ public final class GenMapRedUtils {
     Task<MoveWork> mvTask = null;
 
     if (!chDir) {
-      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
-          mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable());
+      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), isMmTable,
+          isDirectInsert);
     }
 
     // Set the move task to be dependent on the current task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 2328eed..fed890f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7329,6 +7329,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     Table destinationTable = null; // destination table if any
     boolean destTableIsTransactional;     // true for full ACID table and MM table
     boolean destTableIsFullAcid; // should the destination table be written to using ACID
+    boolean isDirectInsert = false; // should we add files directly to the final path
     boolean destTableIsTemporary = false;
     boolean destTableIsMaterialization = false;
     Partition destinationPartition = null;// destination partition if any
@@ -7342,7 +7343,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
-    boolean isMmTable = false, isMmCreate = false;
+    boolean isMmTable = false, isMmCreate = false, isNonNativeTable = false;
     Long writeId = null;
     HiveTxnManager txnMgr = getTxnMgr();
 
@@ -7386,13 +7387,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath());
       }
 
-      boolean isNonNativeTable = destinationTable.isNonNative();
+      isNonNativeTable = destinationTable.isNonNative();
       isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
-      if (isNonNativeTable || isMmTable) {
-        queryTmpdir = destinationPath;
-      } else {
-        queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath);
+      AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
+      // this table_desc does not contain the partitioning columns
+      tableDescriptor = Utilities.getTableDesc(destinationTable);
+
+      if (!isNonNativeTable) {
+        if (destTableIsTransactional) {
+          acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
+        }
       }
+      isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp);
+      queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir
             + " from " + destinationPath);
@@ -7401,8 +7408,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         // set the root of the temporary path where dynamic partition columns will populate
         dpCtx.setRootPath(queryTmpdir);
       }
-      // this table_desc does not contain the partitioning columns
-      tableDescriptor = Utilities.getTableDesc(destinationTable);
 
       // Add NOT NULL constraint check
       input = genConstraintsPlan(dest, qb, input);
@@ -7436,7 +7441,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // Create the work for moving the table
       // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
       if (!isNonNativeTable) {
-        AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
         if (destTableIsTransactional) {
           acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
           checkAcidConstraints();
@@ -7465,10 +7469,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         // deltas and base and leave them up to the cleaner to clean up
         boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
             destinationTable.getDbName(), destinationTable.getTableName());
-        LoadFileType loadType = (!isInsertInto && !destTableIsTransactional)
-            ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+        LoadFileType loadType;
+        if (isDirectInsert) {
+          loadType = LoadFileType.IGNORE;
+        } else if (!isInsertInto && !destTableIsTransactional) {
+          loadType = LoadFileType.REPLACE_ALL;
+        } else {
+          loadType = LoadFileType.KEEP_EXISTING;
+        }
         ltd.setLoadFileType(loadType);
         ltd.setInsertOverwrite(!isInsertInto);
+        ltd.setIsDirectInsert(isDirectInsert);
         ltd.setLbCtx(lbCtx);
         loadTableWork.add(ltd);
       } else {
@@ -7525,13 +7536,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         }
       }
 
+      isNonNativeTable = destinationTable.isNonNative();
       isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
-      queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath);
+      AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
+      // this table_desc does not contain the partitioning columns
+      tableDescriptor = Utilities.getTableDesc(destinationTable);
+
+      if (!isNonNativeTable) {
+        if (destTableIsTransactional) {
+          acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
+        }
+      }
+      isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp);
+      queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying "
             + queryTmpdir + " from " + destinationPath);
       }
-      tableDescriptor = Utilities.getTableDesc(destinationTable);
 
       // Add NOT NULL constraint check
       input = genConstraintsPlan(dest, qb, input);
@@ -7561,7 +7582,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(),
           destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(),
           destinationPartition.isStoredAsSubDirectories());
-      AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
       if (destTableIsTransactional) {
         acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
         checkAcidConstraints();
@@ -7589,10 +7609,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       boolean isInsertInto = !qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest);
       // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
       // deltas and base and leave them up to the cleaner to clean up
-      LoadFileType loadType = (!isInsertInto && !destTableIsTransactional)
-          ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+      LoadFileType loadType;
+      if (isDirectInsert) {
+        loadType = LoadFileType.IGNORE;
+      } else if (!isInsertInto && !destTableIsTransactional) {
+        loadType = LoadFileType.REPLACE_ALL;
+      } else {
+        loadType = LoadFileType.KEEP_EXISTING;
+      }
       ltd.setLoadFileType(loadType);
       ltd.setInsertOverwrite(!isInsertInto);
+      ltd.setIsDirectInsert(isDirectInsert);
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
@@ -7818,7 +7845,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         // the metastore table, then we move and commit the partitions. At least for the time being,
         // this order needs to be enforced because metastore expects a table to exist before we can
         // add any partitions to it.
-        boolean isNonNativeTable = tableDescriptor.isNonNative();
+        isNonNativeTable = tableDescriptor.isNonNative();
         if (!isNonNativeTable) {
           AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
           if (destTableIsTransactional) {
@@ -7905,7 +7932,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition,
         destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, destinationTable, writeId, isMmCreate, destType, qb);
+        canBeMerged, destinationTable, writeId, isMmCreate, destType, qb, isDirectInsert);
     if (isMmCreate) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
       if (tableDesc != null) {
@@ -7961,6 +7988,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return output;
   }
 
+  private boolean isDirectInsert(boolean destTableIsFullAcid, AcidUtils.Operation acidOp) {
+    boolean directInsertEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_ACID_DIRECT_INSERT_ENABLED);
+    boolean directInsert = directInsertEnabled && destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT);
+    if (LOG.isDebugEnabled() && directInsert) {
+      LOG.debug("Direct insert for ACID tables is enabled.");
+    }
+    return directInsert;
+  }
+
+  private Path getTmpDir(boolean isNonNativeTable, boolean isMmTable, boolean isDirectInsert,
+      Path destinationPath) {
+    /**
+     * We will directly insert to the final destination in the following cases:
+     * 1. Non native table
+     * 2. Micro-managed (insert only table)
+     * 3. Full ACID table and operation type is INSERT
+     */
+    if (isNonNativeTable || isMmTable || isDirectInsert) {
+      return destinationPath;
+    } else {
+      return ctx.getTempDirForFinalJobPath(destinationPath);
+    }
+  }
+
   private boolean useBatchingSerializer(String serdeClassName) {
     return SessionState.get().isHiveServerQuery() &&
       hasSetBatchSerializer(serdeClassName);
@@ -8115,7 +8166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                                           boolean destTableIsMaterialization, Path queryTmpdir,
                                           SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
                                           RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas,
-                                          Integer dest_type, QB qb) throws SemanticException {
+                                          Integer dest_type, QB qb, boolean isDirectInsert) throws SemanticException {
     boolean isInsertOverwrite = false;
     switch (dest_type) {
     case QBMetaData.DEST_PARTITION:
@@ -8140,7 +8191,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
         canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
         dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(),
-        qb.isCTAS() || qb.isMaterializedView());
+        qb.isCTAS() || qb.isMaterializedView(), isDirectInsert);
 
     boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
     fileSinkDesc.setHiveServerQuery(isHiveServerQuery);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index c102a69..5fb5fd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -433,7 +433,7 @@ public class GenSparkUtils {
     Task<MoveWork> mvTask = null;
 
     if (!chDir) {
-      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false);
+      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false, false);
     }
 
     // Set the move task to be dependent on the current task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index ecc7bde..bbf73cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -112,6 +112,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
 
   private boolean isInsertOverwrite = false;
 
+  private boolean isDirectInsert = false;
+
   private boolean isQuery = false;
 
   private boolean isCTASorCM = false;
@@ -122,12 +124,10 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
   /**
    * @param destPath - the final destination for data
    */
-  public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
-      final boolean compressed, final int destTableId, final boolean multiFileSpray,
-      final boolean canBeMerged, final int numFiles, final int totalFiles,
-      final List<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
-      Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) {
-
+  public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId,
+      final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles,
+      final List<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, Long mmWriteId,
+      boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM, boolean isDirectInsert) {
     this.dirName = dirName;
     this.tableInfo = tableInfo;
     this.compressed = compressed;
@@ -145,6 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     this.isInsertOverwrite = isInsertOverwrite;
     this.isQuery = isQuery;
     this.isCTASorCM = isCTASorCM;
+    this.isDirectInsert = isDirectInsert;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -164,9 +165,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
 
   @Override
   public Object clone() throws CloneNotSupportedException {
-    FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
-        destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM);
+    FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged,
+        numFiles, totalFiles, partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery,
+        isCTASorCM, isDirectInsert);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -184,6 +185,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     ret.setFilesToFetch(filesToFetch);
     ret.setIsQuery(isQuery);
     ret.setIsCTASorCM(isCTASorCM);
+    ret.setIsDirectInsert(isDirectInsert);
     return ret;
   }
 
@@ -223,6 +225,14 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     this.isUsingBatchingSerDe = isUsingBatchingSerDe;
   }
 
+  public void setIsDirectInsert(boolean isDirectInsert) {
+    this.isDirectInsert = isDirectInsert;
+  }
+
+  public boolean isDirectInsert() {
+    return this.isDirectInsert;
+  }
+
   @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
   public Path getDirName() {
     return dirName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index bed0581..a62b3cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -43,6 +43,7 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
   private int stmtId;
   private Long currentWriteId;
   private boolean isInsertOverwrite;
+  private boolean isDirectInsert;
 
   // TODO: the below seem like they should just be combined into partitionDesc
   private Table mdTable;
@@ -235,6 +236,14 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
    this.isInsertOverwrite = v;
   }
 
+  public void setIsDirectInsert(boolean isDirectInsert) {
+    this.isDirectInsert = isDirectInsert;
+  }
+
+  public boolean isDirectInsert() {
+    return this.isDirectInsert;
+  }
+
   /**
    * @return the lbCtx
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 739f2b6..b4bc3c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -404,6 +404,7 @@ public class CompactorMR {
     private int bucketNum;
     private Path base;
     private Path[] deltas;
+    private Map<String, String> deltasToAttemptId;
 
     public CompactorInputSplit() {
     }
@@ -420,12 +421,13 @@ public class CompactorMR {
      * @throws IOException
      */
     CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base,
-                               Path[] deltas)
+                               Path[] deltas, Map<String, String> deltasToAttemptId)
         throws IOException {
       bucketNum = bucket;
       this.base = base;
       this.deltas = deltas;
       locations = new ArrayList<String>();
+      this.deltasToAttemptId = deltasToAttemptId;
 
       for (Path path : files) {
         FileSystem fs = path.getFileSystem(hadoopConf);
@@ -470,6 +472,13 @@ public class CompactorMR {
       for (int i = 0; i < deltas.length; i++) {
         dataOutput.writeInt(deltas[i].toString().length());
         dataOutput.writeBytes(deltas[i].toString());
+        String attemptId = deltasToAttemptId.get(deltas[i].toString());
+        if (attemptId == null) {
+          dataOutput.writeInt(0);
+        } else {
+          dataOutput.writeInt(attemptId.length());
+          dataOutput.writeBytes(attemptId.toString());
+        }
       }
 
     }
@@ -502,11 +511,20 @@ public class CompactorMR {
       }
       numElements = dataInput.readInt();
       deltas = new Path[numElements];
+      deltasToAttemptId = new HashMap<>();
       for (int i = 0; i < numElements; i++) {
         len = dataInput.readInt();
         buf = new byte[len];
         dataInput.readFully(buf);
         deltas[i] = new Path(new String(buf));
+        len = dataInput.readInt();
+        String attemptId = null;
+        if (len > 0) {
+          buf = new byte[len];
+          dataInput.readFully(buf);
+          attemptId = new String(buf);
+        }
+        deltasToAttemptId.put(deltas[i].toString(), attemptId);
       }
     }
 
@@ -516,6 +534,7 @@ public class CompactorMR {
       bucketNum = other.bucketNum;
       base = other.base;
       deltas = other.deltas;
+      deltasToAttemptId = other.deltasToAttemptId;
     }
 
     int getBucket() {
@@ -530,6 +549,10 @@ public class CompactorMR {
       return deltas;
     }
 
+    Map<String, String> getDeltasToAttemptId() {
+      return deltasToAttemptId;
+    }
+
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
@@ -587,7 +610,7 @@ public class CompactorMR {
             // For each file, figure out which bucket it is.
             Matcher matcher = isRawFormat ?
               AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
-              : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+              : AcidUtils.BUCKET_PATTERN.matcher(f.getPath().getName());
             addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
           }
         } else {
@@ -606,7 +629,7 @@ public class CompactorMR {
         // multiple ingestions of various sizes.
         Path[] deltasForSplit = isTableBucketed ? deltaDirs : getDeltaDirsFromBucketTracker(bt);
         splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets,
-            bt.sawBase ? baseDir : null, deltasForSplit));
+            bt.sawBase ? baseDir : null, deltasForSplit, bt.deltasToAttemptId));
       }
 
       LOG.debug("Returning " + splits.size() + " splits");
@@ -643,7 +666,15 @@ public class CompactorMR {
         //may be a data loss scenario
         throw new IllegalArgumentException(msg);
       }
-      int bucketNum = Integer.parseInt(matcher.group());
+      int bucketNum = -1;
+      String attemptId = null;
+      if (matcher.groupCount() > 0) {
+        bucketNum = Integer.parseInt(matcher.group(1));
+        attemptId = matcher.group(2) != null ? matcher.group(2).substring(1) : null;
+      } else {
+        bucketNum = Integer.parseInt(matcher.group());
+      }
+
       BucketTracker bt = splitToBucketMap.get(bucketNum);
       if (bt == null) {
         bt = new BucketTracker();
@@ -652,16 +683,19 @@ public class CompactorMR {
       LOG.debug("Adding " + file.toString() + " to list of files for splits");
       bt.buckets.add(file);
       bt.sawBase |= sawBase;
+      bt.deltasToAttemptId.put(file.getParent().toString(), attemptId);
     }
 
     private static class BucketTracker {
       BucketTracker() {
         sawBase = false;
         buckets = new ArrayList<Path>();
+        deltasToAttemptId = new HashMap<>();
       }
 
       boolean sawBase;
       List<Path> buckets;
+      Map<String, String> deltasToAttemptId;
     }
   }
 
@@ -734,7 +768,7 @@ public class CompactorMR {
       boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =
           aif.getRawReader(jobConf, isMajor, split.getBucket(),
-                  writeIdList, split.getBaseDir(), split.getDeltaDirs());
+                  writeIdList, split.getBaseDir(), split.getDeltaDirs(), split.getDeltasToAttemptId());
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java b/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
index 58e6289..e9e49a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
@@ -246,7 +246,6 @@ public class UpgradeTool {
    */
   static void handleRenameFiles(Table t, Path p, boolean execute, Configuration conf,
       boolean isBucketed, PrintWriter pw) throws IOException {
-    AcidUtils.BUCKET_DIGIT_PATTERN.matcher("foo");
     if (isBucketed) {
       /* For bucketed tables we assume that Hive wrote them and 0000M_0 and 0000M_0_copy_8
       are the only possibilities.  Since we can't move files across buckets the only thing we
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
index c9cb669..fa15e28 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
@@ -294,6 +294,6 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests {
     runStatementOnDriver("insert into Tstage partition(p=1) values(0,2),(1,4)");
 
     runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '"
-        + getWarehouseDir() + "/tstage/p=1/delta_0000001_0000001_0000/bucket_00001'");
+        + getWarehouseDir() + "/tstage/p=1/delta_0000001_0000001_0000/bucket_00001_0'");
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 8421408..a1f59a8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -1167,7 +1167,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
       {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"},
       {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"},
       {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"},
-      {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001"}
+      {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"}
     };
     checkResult(expected, query, isVectorized, "before compact", LOG);
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e56d831..79dfb02 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -474,7 +474,7 @@ public class TestTxnCommands2 {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(1, buckets.length); // only one bucket file
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000_0"));
       } else {
         Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
       }
@@ -776,7 +776,7 @@ public class TestTxnCommands2 {
         } else if (numDelta == 2) {
           Assert.assertEquals("delta_10000002_10000002_0000", status[i].getPath().getName());
           Assert.assertEquals(1, buckets.length);
-          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+          Assert.assertEquals("bucket_00000_0", buckets[0].getPath().getName());
         }
       } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
         numDeleteDelta++;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 908ceb4..4f4bca2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -76,7 +76,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S";
     String[][] expected = new String[][] {
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
-            "s/delta_0000001_0000001_0000/bucket_00000"},
+            "s/delta_0000001_0000001_0000/bucket_00000_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
             "s/delta_0000002_0000002_0000/bucket_00000"}};
     checkResult(expected, testQuery, false, "check data", LOG);
@@ -273,14 +273,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
         "select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b";
     String[][] expected = new String[][]{
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday",
-            "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001"},
+            "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttoday",
-            "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000"},
+            "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000_0"},
 
         {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttomorrow",
-            "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001"},
+            "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001_0"},
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow",
-            "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000"}};
+            "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000_0"}};
     checkResult(expected, testQuery, isVectorized, "after insert", LOG);
 
     runStatementOnDriver("update acid_uap set b = 'fred'");
@@ -324,9 +324,9 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T";
     String[][] expected = new String[][] {
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2",
-            "t/delta_0000001_0000001_0000/bucket_00000"},
+            "t/delta_0000001_0000001_0000/bucket_00000_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4",
-            "t/delta_0000002_0000002_0000/bucket_00000"}};
+            "t/delta_0000002_0000002_0000/bucket_00000_0"}};
     checkResult(expected, testQuery, false, "check data", LOG);
 
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index 8676e0d..6394429 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -61,9 +61,9 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4",
             "acidtbl/delta_0000002_0000002_0000/bucket_00001"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
-            "acidtbl/delta_0000003_0000003_0000/bucket_00001"},
+            "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8",
-            "acidtbl/delta_0000003_0000003_0000/bucket_00001"}};
+            "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"}};
     checkResult(expected, testQuery, false, "check data", LOG);
 
     /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
@@ -100,11 +100,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
             "acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"},
         {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
-            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
-            "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"},
+            "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001_0"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
-            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
     checkResult(expected, testQuery, false, "check data", LOG);
 
     /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
@@ -124,11 +124,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
             "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
         {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
-            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
             "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
-            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
 
     checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 66b2b27..ba53417 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -379,7 +379,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
         "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0",
-            "t/p=10/delta_0000001_0000001_0000/bucket_00000"},
+            "t/p=10/delta_0000001_0000001_0000/bucket_00000_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4",
             "t/p=11/delta_0000002_0000002_0000/000000_0"},
         {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index bb55d9f..125c76a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -126,13 +126,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String[][] expectedInter2 = new String[][] {
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"}
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
     };
     checkResult(expectedInter2, testQuery, isVectorized, "insert");
     runStatementOnDriver("delete from T where a = 3");
     String[][] expectedInter3 = new String[][] {
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"}
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
     };
     checkResult(expectedInter3, testQuery, isVectorized, "delete");
     //test minor compaction
@@ -165,7 +165,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String[][] expected5 = new String[][]{
         {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
         {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
-        {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"}
+        {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000_0"}
     };
     checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
 
@@ -199,8 +199,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
         //normal insert
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
         //Load Data
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}};
@@ -444,8 +444,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
         {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"},
         {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"}
     };
@@ -483,8 +483,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}
     };
     checkResult(expected, testQuery, isVectorized, "load data inpath");
   }
@@ -505,7 +505,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     List<String> rs = runStatementOnDriver("select INPUT__FILE__NAME from T");
     Assert.assertEquals(1, rs.size());
     Assert.assertTrue("Unexpcted file name", rs.get(0)
-        .endsWith("t/delta_0000001_0000001_0000/bucket_00000"));
+        .endsWith("t/delta_0000001_0000001_0000/bucket_00000_0"));
     //T2 is an acid table so this should fail
     CommandProcessorException e =
         runStatementOnDriverNegative("load data local inpath '" + rs.get(0) + "' into table T2");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index ea6b1d9..88d5d04 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -48,6 +48,7 @@ import java.util.Set;
 
 public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class);
+  private static final String NO_BUCKETS_TBL_NAME = "nobuckets";
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnNoBuckets.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
@@ -80,13 +81,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc tblproperties('transactional'='false')");
     runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1));
     runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
-    runStatementOnDriver("drop table if exists nobuckets");
-    runStatementOnDriver("create table nobuckets (c1 integer, c2 integer, c3 integer) stored " +
+    runStatementOnDriver(String.format("drop table if exists %s", NO_BUCKETS_TBL_NAME));
+    runStatementOnDriver("create table " + NO_BUCKETS_TBL_NAME + " (c1 integer, c2 integer, c3 integer) stored " +
       "as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
-    String stmt = "insert into nobuckets select * from tmp";
+    String stmt = String.format("insert into %s select * from tmp", NO_BUCKETS_TBL_NAME);
     runStatementOnDriver(stmt);
     List<String> rs = runStatementOnDriver(
-      "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+        String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME));
     Assert.assertEquals("", 4, rs.size());
     LOG.warn("after insert");
     for(String s : rs) {
@@ -96,54 +97,58 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
      * The number in the file name is writerId.  This is the number encoded in ROW__ID.bucketId -
      * see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
 
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
-    rs = runStatementOnDriver("explain  update nobuckets set c3 = 17 where c3 in(0,1)");
+    rs = runStatementOnDriver(String.format("explain  update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME));
     LOG.warn("Query Plan: ");
     for (String s : rs) {
       LOG.warn(s);
     }
 
-    runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
-    rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+    runStatementOnDriver(String.format("update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME));
+    rs = runStatementOnDriver(
+        String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+            NO_BUCKETS_TBL_NAME));
     LOG.warn("after update");
     for(String s : rs) {
       LOG.warn(s);
     }
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
     //so update has 1 writer, but which creates buckets where the new rows land
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000"));
     // update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001"));
 
     Set<String> expectedFiles = new HashSet<>();
     //both delete events land in corresponding buckets to the original row-ids
-    expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00000");
-    expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00001");
-    expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00000");
-    expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00001");
-    expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00000");
-    expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001");
     //check that we get the right files on disk
-    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
     //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
     // methods to print to a supplied stream but those are package private
 
-    runStatementOnDriver("alter table nobuckets compact 'major'");
+    runStatementOnDriver(String.format("alter table %s compact 'major'", NO_BUCKETS_TBL_NAME));
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+    rs = runStatementOnDriver(
+        String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+            NO_BUCKETS_TBL_NAME));
     LOG.warn("after major compact");
     for(String s : rs) {
       LOG.warn(s);
@@ -163,37 +168,37 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     */
 
     String expected[][] = {
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", "nobuckets/base_0000002_v0000025/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", "nobuckets/base_0000002_v0000025/bucket_00001"},
-        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", "nobuckets/base_0000002_v0000025/bucket_00001"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", "nobuckets/base_0000002_v0000025/bucket_00000"}
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000"}
     };
     checkResult(expected,
         "select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME")
-            + " from nobuckets order by c1, c2, c3",
+            + " from " + NO_BUCKETS_TBL_NAME + " order by c1, c2, c3",
         shouldVectorize(),
         "After Major Compaction", LOG);
 
     expectedFiles.clear();
-    expectedFiles.add("obuckets/delete_delta_0000002_0000002_0000/bucket_00000");
-    expectedFiles.add("obuckets/delete_delta_0000002_0000002_0000/bucket_00001");
-    expectedFiles.add("house/nobuckets/delta_0000001_0000001_0000/bucket_00000");
-    expectedFiles.add("house/nobuckets/delta_0000001_0000001_0000/bucket_00001");
-    expectedFiles.add("house/nobuckets/delta_0000002_0000002_0000/bucket_00000");
-    expectedFiles.add("house/nobuckets/delta_0000002_0000002_0000/bucket_00001");
-    expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00000");
-    expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00001");
-    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001");
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
 
     TestTxnCommands2.runCleaner(hiveConf);
-    rs = runStatementOnDriver("select c1, c2, c3 from nobuckets order by c1, c2, c3");
+    rs = runStatementOnDriver(String.format("select c1, c2, c3 from %s order by c1, c2, c3", NO_BUCKETS_TBL_NAME));
     int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}};
     Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
 
     expectedFiles.clear();
-    expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00000");
-    expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00001");
-    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001");
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
   }
 
   @Test
@@ -209,13 +214,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
     runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals3));
     runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals4));
-    runStatementOnDriver("drop table if exists nobuckets");
-    runStatementOnDriver("create table nobuckets (c1 integer, c2 integer) partitioned by (c3 integer) stored " +
-      "as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
-    String stmt = "insert into nobuckets partition(c3) select * from tmp";
+    runStatementOnDriver("drop table if exists " + NO_BUCKETS_TBL_NAME);
+    runStatementOnDriver(String.format("create table %s (c1 integer, c2 integer) partitioned by (c3 integer) stored " +
+      "as orc tblproperties('transactional'='true', 'transactional_properties'='default')", NO_BUCKETS_TBL_NAME));
+    String stmt = String.format("insert into %s partition(c3) select * from tmp", NO_BUCKETS_TBL_NAME);
     runStatementOnDriver(stmt);
     List<String> rs = runStatementOnDriver(
-      "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+      String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME));
     Assert.assertEquals("", 8, rs.size());
     LOG.warn("after insert");
     for(String s : rs) {
@@ -223,16 +228,17 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     }
 
     rs = runStatementOnDriver(
-      "select * from nobuckets where c2 in (0,3)");
+      String.format("select * from %s where c2 in (0,3)", NO_BUCKETS_TBL_NAME));
     Assert.assertEquals(3, rs.size());
-    runStatementOnDriver("update nobuckets set c2 = 17 where c2 in(0,3)");
-    rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+    runStatementOnDriver(String.format("update %s set c2 = 17 where c2 in(0,3)", NO_BUCKETS_TBL_NAME));
+    rs = runStatementOnDriver(
+        String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+            NO_BUCKETS_TBL_NAME));
     LOG.warn("after update");
     for(String s : rs) {
       LOG.warn(s);
     }
-    rs = runStatementOnDriver(
-      "select * from nobuckets where c2=17");
+    rs = runStatementOnDriver(String.format("select * from %s where c2=17", NO_BUCKETS_TBL_NAME));
     Assert.assertEquals(3, rs.size());
   }
 
@@ -332,11 +338,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID");
 
     String expected[][] = {
-        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001_0"},
     };
     checkExpected(rs, expected, "Unexpected row count after ctas");
   }
@@ -563,9 +569,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13",
             "bucket_00000", "000000_0_copy_1"},
         {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t0\t15",
-            "bucket_00000", "bucket_00000"},
+            "bucket_00000", "bucket_00000_0"},
         {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t0\t17",
-            "bucket_00000", "bucket_00000"},
+            "bucket_00000", "bucket_00000_0"},
         {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t0\t120",
             "bucket_00000", "bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
@@ -577,7 +583,7 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6",
             "bucket_00000", "000000_0_copy_2"},
         {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t1\t16",
-            "bucket_00000", "bucket_00000"}
+            "bucket_00000", "bucket_00000_0"}
     };
     Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
     for(int i = 0; i < expected.length; i++) {
@@ -792,14 +798,14 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     String query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b";
     List<String> rs = runStatementOnDriver(query);
     String[][] expected = {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"}
     };
     checkExpected(rs, expected, "insert data");
 
@@ -810,10 +816,10 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b";
     rs = runStatementOnDriver(query);
     String[][] expected2 = {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
         {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
@@ -842,8 +848,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     List<String> rs = runStatementOnDriver(query);
     String[][] expected = {
         //this proves data is written in Acid layout so T was made Acid
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}
     };
     checkExpected(rs, expected, "insert data");
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index af14e62..1435269 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
@@ -231,14 +233,8 @@ public abstract class TxnCommandsBaseForTests {
    * @param expectedFiles - suffixes of expected Paths.  Must be the same length
    * @param rootPath - table or partition root where to start looking for actual files, recursively
    */
-  void assertExpectedFileSet(Set<String> expectedFiles, String rootPath) throws Exception {
-    int suffixLength = 0;
-    for(String s : expectedFiles) {
-      if(suffixLength > 0) {
-        assert suffixLength == s.length() : "all entries must be the same length. current: " + s;
-      }
-      suffixLength = s.length();
-    }
+  void assertExpectedFileSet(Set<String> expectedFiles, String rootPath, String tableName) throws Exception {
+    Pattern pattern = Pattern.compile("(.+)/(" + tableName + "/[delete_delta|delta|base].+)");
     FileSystem fs = FileSystem.get(hiveConf);
     Set<String> actualFiles = new HashSet<>();
     RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true);
@@ -246,7 +242,10 @@ public abstract class TxnCommandsBaseForTests {
       LocatedFileStatus lfs = remoteIterator.next();
       if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
         String p = lfs.getPath().toString();
-        actualFiles.add(p.substring(p.length() - suffixLength, p.length()));
+        Matcher matcher = pattern.matcher(p);
+        if (matcher.matches()) {
+          actualFiles.add(matcher.group(2));
+        }
       }
     }
     Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index 83db48e..801133d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -146,7 +146,7 @@ public class TestExecDriver {
         db.createTable(src, cols, null, TextInputFormat.class,
             HiveIgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
-            true, false, false, true, null, 0, false);
+           true, false, false, true, null, 0, false, false);
         i++;
       }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 2c4b69b..ebb51c4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -286,7 +286,7 @@ public class TestFileSinkOperator {
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
       desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
-          false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);
+          false, 1, 1, partCols, dpCtx, null, null, false, false, false, false, false);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }
@@ -705,7 +705,8 @@ public class TestFileSinkOperator {
                                               int bucket,
                                               ValidWriteIdList validWriteIdList,
                                               Path baseDirectory,
-                                              Path[] deltaDirectory) throws
+                                              Path[] deltaDirectory,
+                                              Map<String,String> deltaToAttemptId) throws
         IOException {
       return null;
     }
@@ -778,6 +779,11 @@ public class TestFileSinkOperator {
         public long getBufferedRowCount() {
           return records.size();
         }
+
+        @Override
+        public Path getUpdatedFilePath() {
+          return null;
+        }
       };
     }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 48e9afc..80fb1af 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -1243,6 +1243,7 @@ public class TestDbTxnManager2 {
 
     //================
     //test with predicates such that partition pruning doesn't kick in
+    driver.run("drop table if exists tab1");
     driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"); //txnid:4
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index cfd7290..9a9ab53 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -383,7 +383,7 @@ public abstract class CompactorTest {
     @Override
     public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents, int bucket,
                                         ValidWriteIdList validWriteIdList,
-                                        Path baseDirectory, Path... deltaDirectory) throws IOException {
+                                        Path baseDirectory, Path[] deltaDirectory, Map<String, String> deltaToAttemptId) throws IOException {
 
       List<Path> filesToRead = new ArrayList<Path>();
       if (baseDirectory != null) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 70ae85c..443f982 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -162,7 +162,7 @@ public class TestWorker extends CompactorTest {
     deltas[1] = new Path(delta2);
 
     CompactorMR.CompactorInputSplit split =
-        new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas);
+        new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas, new HashMap<String, String>());
 
     Assert.assertEquals(520L, split.getLength());
     String[] locations = split.getLocations();
@@ -207,7 +207,7 @@ public class TestWorker extends CompactorTest {
     deltas[1] = new Path(delta2);
 
     CompactorMR.CompactorInputSplit split =
-        new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas);
+        new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas, new HashMap<String, String>());
 
     ByteArrayOutputStream buf = new ByteArrayOutputStream();
     DataOutput out = new DataOutputStream(buf);
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q
new file mode 100644
index 0000000..554ac30
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+create table dummy_n2(i int);
+insert into table dummy_n2 values (1);
+select * from dummy_n2;
+
+create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true');
+
+insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps;
+
+select * from partunion1;
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q
new file mode 100644
index 0000000..3cdaef8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q
@@ -0,0 +1,25 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+drop table if exists dummy_n7;
+drop table if exists partunion1_n0;
+ 
+create table dummy_n7(i int);
+insert into table dummy_n7 values (1);
+select * from dummy_n7;
+
+create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true');
+
+set hive.merge.tezfiles=true;
+
+insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n7;
+
+select * from partunion1_n0;
+
+drop table dummy_n7;
+drop table partunion1_n0;
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q b/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q
new file mode 100644
index 0000000..788d850
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q
@@ -0,0 +1,94 @@
+--! qt:dataset:src
+set hive.explain.user=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+-- SORT_QUERY_RESULTS
+
+DROP TABLE IF EXISTS DEST1_acid_1;
+DROP TABLE IF EXISTS DEST1_acid_2;
+DROP TABLE IF EXISTS DEST1_acid_3;
+DROP TABLE IF EXISTS DEST1_acid_4;
+DROP TABLE IF EXISTS DEST1_acid_5;
+DROP TABLE IF EXISTS DEST1_acid_6;
+
+CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+
+FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+                         UNION all
+      select key, value from src s0
+                             ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_1;
+select * from DEST1_acid_2;
+
+FROM (
+      select key, value from src s0
+                         UNION all
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_3;
+select * from DEST1_acid_4;
+
+FROM (
+      select key, value from src s0
+                         UNION all
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_5;
+select * from DEST1_acid_6;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_7;
+select * from DEST1_acid_8;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION distinct 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_9;
+select * from DEST1_acid_10;
+
+DROP TABLE IF EXISTS DEST1_acid_1;
+DROP TABLE IF EXISTS DEST1_acid_2;
+DROP TABLE IF EXISTS DEST1_acid_3;
+DROP TABLE IF EXISTS DEST1_acid_4;
+DROP TABLE IF EXISTS DEST1_acid_5;
+DROP TABLE IF EXISTS DEST1_acid_6;
\ No newline at end of file
diff --git a/ql/src/test/results/clientpositive/acid_subquery.q.out b/ql/src/test/results/clientpositive/acid_subquery.q.out
index 1dc1775..4ca2e5a 100644
--- a/ql/src/test/results/clientpositive/acid_subquery.q.out
+++ b/ql/src/test/results/clientpositive/acid_subquery.q.out
@@ -95,8 +95,17 @@ POSTHOOK: Input: default@target@p=2/q=2
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@target@p=1/q=2
 POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=2/q=2
 POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(target)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (target)t.FieldSchema(name:p, type:int, comment:null), (target)t.FieldSchema(name:q, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
diff --git a/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out b/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
index e324d5e..04b16a0 100644
--- a/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
+++ b/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
@@ -190,8 +190,17 @@ POSTHOOK: Input: default@target@p=2/q=2
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@target@p=1/q=2
 POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=1/q=3
 POSTHOOK: Output: default@target@p=2/q=2
 POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(target)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (target)t.FieldSchema(name:p, type:int, comment:null), (target)t.FieldSchema(name:q, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
diff --git a/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out b/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
index 61b0057..f9c7060 100644
--- a/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
+++ b/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
@@ -73,8 +73,12 @@ insert into table encryptedTable_n0 partition (key)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@encryptedtable_n0@key=238
+POSTHOOK: Output: default@encryptedtable_n0@key=501
+POSTHOOK: Output: default@encryptedtable_n0@key=502
 POSTHOOK: Output: default@encryptedtable_n0@key=86
 POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=238).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=501).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=502).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
 POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=86).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
 PREHOOK: query: select * from encryptedTable_n0 order by key
 PREHOOK: type: QUERY
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
index 03b8dc3..699398b 100644
--- a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -611,13 +611,25 @@ POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acid)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acid)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acid)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
 PREHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acid
@@ -626,7 +638,7 @@ POSTHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@srcpart_acid
 #### A masked pattern was here ####
-0
+497
 PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acid
@@ -1150,13 +1162,25 @@ POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidb)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidb)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
 PREHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidb
@@ -1165,7 +1189,7 @@ POSTHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@srcpart_acidb
 #### A masked pattern was here ####
-0
+497
 PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidb
@@ -1997,13 +2021,25 @@ POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidv)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidv)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidv)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
 PREHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidv
@@ -2012,7 +2048,7 @@ POSTHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@srcpart_acidv
 #### A masked pattern was here ####
-0
+497
 PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidv
@@ -2853,13 +2889,25 @@ POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@merge_tmp_table
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
 POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidvb)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidvb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidvb)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
 PREHOOK: query: select count(*) from srcpart_acidvb where ds='2008-04-08' and hr=='12'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidvb
@@ -2868,7 +2916,7 @@ POSTHOOK: query: select count(*) from srcpart_acidvb where ds='2008-04-08' and h
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@srcpart_acidvb
 #### A masked pattern was here ####
-0
+497
 PREHOOK: query: select ds, hr, key, value from srcpart_acidvb where value like '%updated by merge'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart_acidvb
diff --git a/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out b/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
index fbc3326..1ee38d3 100644
--- a/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
+++ b/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
@@ -372,6 +372,12 @@ PREHOOK: Output: default@int_part
 POSTHOOK: query: INSERT OVERWRITE TABLE int_part PARTITION (par) SELECT * FROM b
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@b
+POSTHOOK: Output: default@int_part@par=1
+POSTHOOK: Output: default@int_part@par=2
+POSTHOOK: Output: default@int_part@par=3
+POSTHOOK: Lineage: int_part PARTITION(par=1).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=2).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=3).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
 PREHOOK: query: SELECT count(*) FROM int_part
 PREHOOK: type: QUERY
 PREHOOK: Input: default@int_part
@@ -429,7 +435,11 @@ POSTHOOK: query: INSERT OVERWRITE TABLE int_part PARTITION (par) SELECT * FROM b
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@b
 POSTHOOK: Output: default@int_part@par=1
+POSTHOOK: Output: default@int_part@par=2
+POSTHOOK: Output: default@int_part@par=3
 POSTHOOK: Lineage: int_part PARTITION(par=1).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=2).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=3).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
 PREHOOK: query: SELECT count(*) FROM int_part
 PREHOOK: type: QUERY
 PREHOOK: Input: default@int_part
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out
index 226f2a9..fd28d39 100644
--- a/ql/src/test/results/clientpositive/llap/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -1633,6 +1633,7 @@ POSTHOOK: Input: default@intermediate_n0@p=455
 POSTHOOK: Input: default@intermediate_n0@p=456
 POSTHOOK: Input: default@intermediate_n0@p=457
 POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
 POSTHOOK: Output: default@multi1_mm@p=455
 POSTHOOK: Output: default@multi1_mm@p=456
 POSTHOOK: Output: default@multi1_mm@p=457
@@ -1646,6 +1647,8 @@ POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
 PREHOOK: query: select key, key2, p from multi1_mm order by key, key2, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@multi1_mm
@@ -1713,6 +1716,18 @@ POSTHOOK: Input: default@intermediate_n0@p=455
 POSTHOOK: Input: default@intermediate_n0@p=456
 POSTHOOK: Input: default@intermediate_n0@p=457
 POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
+POSTHOOK: Output: default@multi1_mm@p=455
+POSTHOOK: Output: default@multi1_mm@p=456
+POSTHOOK: Output: default@multi1_mm@p=457
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out
new file mode 100644
index 0000000..07b0efb
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out
@@ -0,0 +1,63 @@
+PREHOOK: query: create table dummy_n2(i int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dummy_n2
+POSTHOOK: query: create table dummy_n2(i int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dummy_n2
+PREHOOK: query: insert into table dummy_n2 values (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@dummy_n2
+POSTHOOK: query: insert into table dummy_n2 values (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@dummy_n2
+POSTHOOK: Lineage: dummy_n2.i SCRIPT []
+PREHOOK: query: select * from dummy_n2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from dummy_n2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion1
+POSTHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion1
+PREHOOK: query: insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n2
+PREHOOK: Output: default@partunion1
+POSTHOOK: query: insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n2
+POSTHOOK: Output: default@partunion1@part1=2014
+POSTHOOK: Lineage: partunion1 PARTITION(part1=2014).id1 EXPRESSION []
+PREHOOK: query: select * from partunion1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion1
+PREHOOK: Input: default@partunion1@part1=2014
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from partunion1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion1
+POSTHOOK: Input: default@partunion1@part1=2014
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	2014
+2	2014
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out
new file mode 100644
index 0000000..7e664e4
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out
@@ -0,0 +1,85 @@
+PREHOOK: query: drop table if exists dummy_n7
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists dummy_n7
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists partunion1_n0
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists partunion1_n0
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dummy_n7(i int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: create table dummy_n7(i int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dummy_n7
+PREHOOK: query: insert into table dummy_n7 values (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: insert into table dummy_n7 values (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@dummy_n7
+POSTHOOK: Lineage: dummy_n7.i SCRIPT []
+PREHOOK: query: select * from dummy_n7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from dummy_n7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion1_n0
+PREHOOK: query: insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7 
+union all 
+select 2 as id1, '2014' as part1 from dummy_n7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: default@partunion1_n0@part1=2014
+POSTHOOK: Lineage: partunion1_n0 PARTITION(part1=2014).id1 EXPRESSION []
+PREHOOK: query: select * from partunion1_n0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion1_n0
+PREHOOK: Input: default@partunion1_n0@part1=2014
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from partunion1_n0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion1_n0
+POSTHOOK: Input: default@partunion1_n0@part1=2014
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	2014
+2	2014
+PREHOOK: query: drop table dummy_n7
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: drop table dummy_n7
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: default@dummy_n7
+PREHOOK: query: drop table partunion1_n0
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partunion1_n0
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: drop table partunion1_n0
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partunion1_n0
+POSTHOOK: Output: default@partunion1_n0
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out
new file mode 100644
index 0000000..d91a321
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out
@@ -0,0 +1,3481 @@
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_1
+POSTHOOK: query: CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_1
+PREHOOK: query: CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_2
+POSTHOOK: query: CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_2
+PREHOOK: query: CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_3
+POSTHOOK: query: CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_3
+PREHOOK: query: CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_4
+POSTHOOK: query: CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_4
+PREHOOK: query: CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_5
+POSTHOOK: query: CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_5
+PREHOOK: query: CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_6
+POSTHOOK: query: CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_6
+PREHOOK: query: CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_7
+POSTHOOK: query: CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_7
+PREHOOK: query: CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_8
+POSTHOOK: query: CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_8
+PREHOOK: query: CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_9
+POSTHOOK: query: CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_9
+PREHOOK: query: CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_10
+POSTHOOK: query: CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_10
+PREHOOK: query: FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+                         UNION all
+      select key, value from src s0
+                             ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_1
+PREHOOK: Output: default@dest1_acid_2
+POSTHOOK: query: FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+                         UNION all
+      select key, value from src s0
+                             ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_1
+POSTHOOK: Output: default@dest1_acid_2
+POSTHOOK: Lineage: dest1_acid_1.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), (src)s0.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_1.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), (src)s0.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_1
+#### A masked pattern was here ####
+0	1
+10	1
+100	1
+103	1
+104	1
+105	1
+11	1
+111	1
+113	1
+114	1
+116	1
+118	1
+119	1
+12	1
+120	1
+125	1
+126	1
+128	1
+129	1
+131	1
+133	1
+134	1
+136	1
+137	1
+138	1
+143	1
+145	1
+146	1
+149	1
+15	1
+150	1
+152	1
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	1
+165	1
+166	1
+167	1
+168	1
+169	1
+17	1
+170	1
+172	1
+174	1
+175	1
+176	1
+177	1
+178	1
+179	1
+18	1
+180	1
+181	1
+183	1
+186	1
+187	1
+189	1
+19	1
+190	1
+191	1
+192	1
+193	1
+194	1
+195	1
+196	1
+197	1
+199	1
+2	1
+20	1
+200	1
+201	1
+202	1
+203	1
+205	1
+207	1
+208	1
+209	1
+213	1
+214	1
+216	1
+217	1
+218	1
+219	1
+221	1
+222	1
+223	1
+224	1
+226	1
+228	1
+229	1
+230	1
+233	1
+235	1
+237	1
+238	1
+239	1
+24	1
+241	1
+242	1
+244	1
+247	1
+248	1
+249	1
+252	1
+255	1
+256	1
+257	1
+258	1
+26	1
+260	1
+262	1
+263	1
+265	1
+266	1
+27	1
+272	1
+273	1
+274	1
+275	1
+277	1
+278	1
+28	1
+280	1
+281	1
+282	1
+283	1
+284	1
+285	1
+286	1
+287	1
+288	1
+289	1
+291	1
+292	1
+296	1
+298	1
+30	1
+302	1
+305	1
+306	1
+307	1
+308	1
+309	1
+310	1
+311	1
+315	1
+316	1
+317	1
+318	1
+321	1
+322	1
+323	1
+325	1
+327	1
+33	1
+331	1
+332	1
+333	1
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	1
+344	1
+345	1
+348	1
+35	1
+351	1
+353	1
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	1
+368	1
+369	1
+37	1
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	1
+384	1
+386	1
+389	1
+392	1
+393	1
+394	1
+395	1
+396	1
+397	1
+399	1
+4	1
+400	1
+401	1
+402	1
+403	1
+404	1
+406	1
+407	1
+409	1
+41	1
+411	1
+413	1
+414	1
+417	1
+418	1
+419	1
+42	1
+421	1
+424	1
+427	1
+429	1
+43	1
+430	1
+431	1
+432	1
+435	1
+436	1
+437	1
+438	1
+439	1
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	1
+455	1
+457	1
+458	1
+459	1
+460	1
+462	1
+463	1
+466	1
+467	1
+468	1
+469	1
+47	1
+470	1
+472	1
+475	1
+477	1
+478	1
+479	1
+480	1
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	1
+490	1
+491	1
+492	1
+493	1
+494	1
+495	1
+496	1
+497	1
+498	1
+5	1
+51	1
+53	1
+54	1
+57	1
+58	1
+64	1
+65	1
+66	1
+67	1
+69	1
+70	1
+72	1
+74	1
+76	1
+77	1
+78	1
+8	1
+80	1
+82	1
+83	1
+84	1
+85	1
+86	1
+87	1
+9	1
+90	1
+92	1
+95	1
+96	1
+97	1
+98	1
+tst1	1
+PREHOOK: query: select * from DEST1_acid_2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_2
+#### A masked pattern was here ####
+0	val_0	1
+10	val_10	1
+100	val_100	1
+103	val_103	1
+104	val_104	1
+105	val_105	1
+11	val_11	1
+111	val_111	1
+113	val_113	1
+114	val_114	1
+116	val_116	1
+118	val_118	1
+119	val_119	1
+12	val_12	1
+120	val_120	1
+125	val_125	1
+126	val_126	1
+128	val_128	1
+129	val_129	1
+131	val_131	1
+133	val_133	1
+134	val_134	1
+136	val_136	1
+137	val_137	1
+138	val_138	1
+143	val_143	1
+145	val_145	1
+146	val_146	1
+149	val_149	1
+15	val_15	1
+150	val_150	1
+152	val_152	1
+153	val_153	1
+155	val_155	1
+156	val_156	1
+157	val_157	1
+158	val_158	1
+160	val_160	1
+162	val_162	1
+163	val_163	1
+164	val_164	1
+165	val_165	1
+166	val_166	1
+167	val_167	1
+168	val_168	1
+169	val_169	1
+17	val_17	1
+170	val_170	1
+172	val_172	1
+174	val_174	1
+175	val_175	1
+176	val_176	1
+177	val_177	1
+178	val_178	1
+179	val_179	1
+18	val_18	1
+180	val_180	1
+181	val_181	1
+183	val_183	1
+186	val_186	1
+187	val_187	1
+189	val_189	1
+19	val_19	1
+190	val_190	1
+191	val_191	1
+192	val_192	1
+193	val_193	1
+194	val_194	1
+195	val_195	1
+196	val_196	1
+197	val_197	1
+199	val_199	1
+2	val_2	1
+20	val_20	1
+200	val_200	1
+201	val_201	1
+202	val_202	1
+203	val_203	1
+205	val_205	1
+207	val_207	1
+208	val_208	1
+209	val_209	1
+213	val_213	1
+214	val_214	1
+216	val_216	1
+217	val_217	1
+218	val_218	1
+219	val_219	1
+221	val_221	1
+222	val_222	1
+223	val_223	1
+224	val_224	1
+226	val_226	1
+228	val_228	1
+229	val_229	1
+230	val_230	1
+233	val_233	1
+235	val_235	1
+237	val_237	1
+238	val_238	1
+239	val_239	1
+24	val_24	1
+241	val_241	1
+242	val_242	1
+244	val_244	1
+247	val_247	1
+248	val_248	1
+249	val_249	1
+252	val_252	1
+255	val_255	1
+256	val_256	1
+257	val_257	1
+258	val_258	1
+26	val_26	1
+260	val_260	1
+262	val_262	1
+263	val_263	1
+265	val_265	1
+266	val_266	1
+27	val_27	1
+272	val_272	1
+273	val_273	1
+274	val_274	1
+275	val_275	1
+277	val_277	1
+278	val_278	1
+28	val_28	1
+280	val_280	1
+281	val_281	1
+282	val_282	1
+283	val_283	1
+284	val_284	1
+285	val_285	1
+286	val_286	1
+287	val_287	1
+288	val_288	1
+289	val_289	1
+291	val_291	1
+292	val_292	1
+296	val_296	1
+298	val_298	1
+30	val_30	1
+302	val_302	1
+305	val_305	1
+306	val_306	1
+307	val_307	1
+308	val_308	1
+309	val_309	1
+310	val_310	1
+311	val_311	1
+315	val_315	1
+316	val_316	1
+317	val_317	1
+318	val_318	1
+321	val_321	1
+322	val_322	1
+323	val_323	1
+325	val_325	1
+327	val_327	1
+33	val_33	1
+331	val_331	1
+332	val_332	1
+333	val_333	1
+335	val_335	1
+336	val_336	1
+338	val_338	1
+339	val_339	1
+34	val_34	1
+341	val_341	1
+342	val_342	1
+344	val_344	1
+345	val_345	1
+348	val_348	1
+35	val_35	1
+351	val_351	1
+353	val_353	1
+356	val_356	1
+360	val_360	1
+362	val_362	1
+364	val_364	1
+365	val_365	1
+366	val_366	1
+367	val_367	1
+368	val_368	1
+369	val_369	1
+37	val_37	1
+373	val_373	1
+374	val_374	1
+375	val_375	1
+377	val_377	1
+378	val_378	1
+379	val_379	1
+382	val_382	1
+384	val_384	1
+386	val_386	1
+389	val_389	1
+392	val_392	1
+393	val_393	1
+394	val_394	1
+395	val_395	1
+396	val_396	1
+397	val_397	1
+399	val_399	1
+4	val_4	1
+400	val_400	1
+401	val_401	1
+402	val_402	1
+403	val_403	1
+404	val_404	1
+406	val_406	1
+407	val_407	1
+409	val_409	1
+41	val_41	1
+411	val_411	1
+413	val_413	1
+414	val_414	1
+417	val_417	1
+418	val_418	1
+419	val_419	1
+42	val_42	1
+421	val_421	1
+424	val_424	1
+427	val_427	1
+429	val_429	1
+43	val_43	1
+430	val_430	1
+431	val_431	1
+432	val_432	1
+435	val_435	1
+436	val_436	1
+437	val_437	1
+438	val_438	1
+439	val_439	1
+44	val_44	1
+443	val_443	1
+444	val_444	1
+446	val_446	1
+448	val_448	1
+449	val_449	1
+452	val_452	1
+453	val_453	1
+454	val_454	1
+455	val_455	1
+457	val_457	1
+458	val_458	1
+459	val_459	1
+460	val_460	1
+462	val_462	1
+463	val_463	1
+466	val_466	1
+467	val_467	1
+468	val_468	1
+469	val_469	1
+47	val_47	1
+470	val_470	1
+472	val_472	1
+475	val_475	1
+477	val_477	1
+478	val_478	1
+479	val_479	1
+480	val_480	1
+481	val_481	1
+482	val_482	1
+483	val_483	1
+484	val_484	1
+485	val_485	1
+487	val_487	1
+489	val_489	1
+490	val_490	1
+491	val_491	1
+492	val_492	1
+493	val_493	1
+494	val_494	1
+495	val_495	1
+496	val_496	1
+497	val_497	1
+498	val_498	1
+5	val_5	1
+51	val_51	1
+53	val_53	1
+54	val_54	1
+57	val_57	1
+58	val_58	1
+64	val_64	1
+65	val_65	1
+66	val_66	1
+67	val_67	1
+69	val_69	1
+70	val_70	1
+72	val_72	1
+74	val_74	1
+76	val_76	1
+77	val_77	1
+78	val_78	1
+8	val_8	1
+80	val_80	1
+82	val_82	1
+83	val_83	1
+84	val_84	1
+85	val_85	1
+86	val_86	1
+87	val_87	1
+9	val_9	1
+90	val_90	1
+92	val_92	1
+95	val_95	1
+96	val_96	1
+97	val_97	1
+98	val_98	1
+tst1	500	1
+PREHOOK: query: FROM (
+      select key, value from src s0
+                         UNION all
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_3
+PREHOOK: Output: default@dest1_acid_4
+POSTHOOK: query: FROM (
+      select key, value from src s0
+                         UNION all
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_3
+POSTHOOK: Output: default@dest1_acid_4
+POSTHOOK: Lineage: dest1_acid_3.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_3.value EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.val1 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.val2 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_3
+#### A masked pattern was here ####
+0	1
+10	1
+100	1
+103	1
+104	1
+105	1
+11	1
+111	1
+113	1
+114	1
+116	1
+118	1
+119	1
+12	1
+120	1
+125	1
+126	1
+128	1
+129	1
+131	1
+133	1
+134	1
+136	1
+137	1
+138	1
+143	1
+145	1
+146	1
+149	1
+15	1
+150	1
+152	1
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	1
+165	1
+166	1
+167	1
+168	1
+169	1
+17	1
+170	1
+172	1
+174	1
+175	1
+176	1
+177	1
+178	1
+179	1
+18	1
+180	1
+181	1
+183	1
+186	1
+187	1
+189	1
+19	1
+190	1
+191	1
+192	1
+193	1
+194	1
+195	1
+196	1
+197	1
+199	1
+2	1
+20	1
+200	1
+201	1
+202	1
+203	1
+205	1
+207	1
+208	1
+209	1
+213	1
+214	1
+216	1
+217	1
+218	1
+219	1
+221	1
+222	1
+223	1
+224	1
+226	1
+228	1
+229	1
+230	1
+233	1
+235	1
+237	1
+238	1
+239	1
+24	1
+241	1
+242	1
+244	1
+247	1
+248	1
+249	1
+252	1
+255	1
+256	1
+257	1
+258	1
+26	1
+260	1
+262	1
+263	1
+265	1
+266	1
+27	1
+272	1
+273	1
+274	1
+275	1
+277	1
+278	1
+28	1
+280	1
+281	1
+282	1
+283	1
+284	1
+285	1
+286	1
+287	1
+288	1
+289	1
+291	1
+292	1
+296	1
+298	1
+30	1
+302	1
+305	1
+306	1
+307	1
+308	1
+309	1
+310	1
+311	1
+315	1
+316	1
+317	1
+318	1
+321	1
+322	1
+323	1
+325	1
+327	1
+33	1
+331	1
+332	1
+333	1
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	1
+344	1
+345	1
+348	1
+35	1
+351	1
+353	1
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	1
+368	1
+369	1
+37	1
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	1
+384	1
+386	1
+389	1
+392	1
+393	1
+394	1
+395	1
+396	1
+397	1
+399	1
+4	1
+400	1
+401	1
+402	1
+403	1
+404	1
+406	1
+407	1
+409	1
+41	1
+411	1
+413	1
+414	1
+417	1
+418	1
+419	1
+42	1
+421	1
+424	1
+427	1
+429	1
+43	1
+430	1
+431	1
+432	1
+435	1
+436	1
+437	1
+438	1
+439	1
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	1
+455	1
+457	1
+458	1
+459	1
+460	1
+462	1
+463	1
+466	1
+467	1
+468	1
+469	1
+47	1
+470	1
+472	1
+475	1
+477	1
+478	1
+479	1
+480	1
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	1
+490	1
+491	1
+492	1
+493	1
+494	1
+495	1
+496	1
+497	1
+498	1
+5	1
+51	1
+53	1
+54	1
+57	1
+58	1
+64	1
+65	1
+66	1
+67	1
+69	1
+70	1
+72	1
+74	1
+76	1
+77	1
+78	1
+8	1
+80	1
+82	1
+83	1
+84	1
+85	1
+86	1
+87	1
+9	1
+90	1
+92	1
+95	1
+96	1
+97	1
+98	1
+tst1	1
+PREHOOK: query: select * from DEST1_acid_4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_4
+#### A masked pattern was here ####
+0	val_0	1
+10	val_10	1
+100	val_100	1
+103	val_103	1
+104	val_104	1
+105	val_105	1
+11	val_11	1
+111	val_111	1
+113	val_113	1
+114	val_114	1
+116	val_116	1
+118	val_118	1
+119	val_119	1
+12	val_12	1
+120	val_120	1
+125	val_125	1
+126	val_126	1
+128	val_128	1
+129	val_129	1
+131	val_131	1
+133	val_133	1
+134	val_134	1
+136	val_136	1
+137	val_137	1
+138	val_138	1
+143	val_143	1
+145	val_145	1
+146	val_146	1
+149	val_149	1
+15	val_15	1
+150	val_150	1
+152	val_152	1
+153	val_153	1
+155	val_155	1
+156	val_156	1
+157	val_157	1
+158	val_158	1
+160	val_160	1
+162	val_162	1
+163	val_163	1
+164	val_164	1
+165	val_165	1
+166	val_166	1
+167	val_167	1
+168	val_168	1
+169	val_169	1
+17	val_17	1
+170	val_170	1
+172	val_172	1
+174	val_174	1
+175	val_175	1
+176	val_176	1
+177	val_177	1
+178	val_178	1
+179	val_179	1
+18	val_18	1
+180	val_180	1
+181	val_181	1
+183	val_183	1
+186	val_186	1
+187	val_187	1
+189	val_189	1
+19	val_19	1
+190	val_190	1
+191	val_191	1
+192	val_192	1
+193	val_193	1
+194	val_194	1
+195	val_195	1
+196	val_196	1
+197	val_197	1
+199	val_199	1
+2	val_2	1
+20	val_20	1
+200	val_200	1
+201	val_201	1
+202	val_202	1
+203	val_203	1
+205	val_205	1
+207	val_207	1
+208	val_208	1
+209	val_209	1
+213	val_213	1
+214	val_214	1
+216	val_216	1
+217	val_217	1
+218	val_218	1
+219	val_219	1
+221	val_221	1
+222	val_222	1
+223	val_223	1
+224	val_224	1
+226	val_226	1
+228	val_228	1
+229	val_229	1
+230	val_230	1
+233	val_233	1
+235	val_235	1
+237	val_237	1
+238	val_238	1
+239	val_239	1
+24	val_24	1
+241	val_241	1
+242	val_242	1
+244	val_244	1
+247	val_247	1
+248	val_248	1
+249	val_249	1
+252	val_252	1
+255	val_255	1
+256	val_256	1
+257	val_257	1
+258	val_258	1
+26	val_26	1
+260	val_260	1
+262	val_262	1
+263	val_263	1
+265	val_265	1
+266	val_266	1
+27	val_27	1
+272	val_272	1
+273	val_273	1
+274	val_274	1
+275	val_275	1
+277	val_277	1
+278	val_278	1
+28	val_28	1
+280	val_280	1
+281	val_281	1
+282	val_282	1
+283	val_283	1
+284	val_284	1
+285	val_285	1
+286	val_286	1
+287	val_287	1
+288	val_288	1
+289	val_289	1
+291	val_291	1
+292	val_292	1
+296	val_296	1
+298	val_298	1
+30	val_30	1
+302	val_302	1
+305	val_305	1
+306	val_306	1
+307	val_307	1
+308	val_308	1
+309	val_309	1
+310	val_310	1
+311	val_311	1
+315	val_315	1
+316	val_316	1
+317	val_317	1
+318	val_318	1
+321	val_321	1
+322	val_322	1
+323	val_323	1
+325	val_325	1
+327	val_327	1
+33	val_33	1
+331	val_331	1
+332	val_332	1
+333	val_333	1
+335	val_335	1
+336	val_336	1
+338	val_338	1
+339	val_339	1
+34	val_34	1
+341	val_341	1
+342	val_342	1
+344	val_344	1
+345	val_345	1
+348	val_348	1
+35	val_35	1
+351	val_351	1
+353	val_353	1
+356	val_356	1
+360	val_360	1
+362	val_362	1
+364	val_364	1
+365	val_365	1
+366	val_366	1
+367	val_367	1
+368	val_368	1
+369	val_369	1
+37	val_37	1
+373	val_373	1
+374	val_374	1
+375	val_375	1
+377	val_377	1
+378	val_378	1
+379	val_379	1
+382	val_382	1
+384	val_384	1
+386	val_386	1
+389	val_389	1
+392	val_392	1
+393	val_393	1
+394	val_394	1
+395	val_395	1
+396	val_396	1
+397	val_397	1
+399	val_399	1
+4	val_4	1
+400	val_400	1
+401	val_401	1
+402	val_402	1
+403	val_403	1
+404	val_404	1
+406	val_406	1
+407	val_407	1
+409	val_409	1
+41	val_41	1
+411	val_411	1
+413	val_413	1
+414	val_414	1
+417	val_417	1
+418	val_418	1
+419	val_419	1
+42	val_42	1
+421	val_421	1
+424	val_424	1
+427	val_427	1
+429	val_429	1
+43	val_43	1
+430	val_430	1
+431	val_431	1
+432	val_432	1
+435	val_435	1
+436	val_436	1
+437	val_437	1
+438	val_438	1
+439	val_439	1
+44	val_44	1
+443	val_443	1
+444	val_444	1
+446	val_446	1
+448	val_448	1
+449	val_449	1
+452	val_452	1
+453	val_453	1
+454	val_454	1
+455	val_455	1
+457	val_457	1
+458	val_458	1
+459	val_459	1
+460	val_460	1
+462	val_462	1
+463	val_463	1
+466	val_466	1
+467	val_467	1
+468	val_468	1
+469	val_469	1
+47	val_47	1
+470	val_470	1
+472	val_472	1
+475	val_475	1
+477	val_477	1
+478	val_478	1
+479	val_479	1
+480	val_480	1
+481	val_481	1
+482	val_482	1
+483	val_483	1
+484	val_484	1
+485	val_485	1
+487	val_487	1
+489	val_489	1
+490	val_490	1
+491	val_491	1
+492	val_492	1
+493	val_493	1
+494	val_494	1
+495	val_495	1
+496	val_496	1
+497	val_497	1
+498	val_498	1
+5	val_5	1
+51	val_51	1
+53	val_53	1
+54	val_54	1
+57	val_57	1
+58	val_58	1
+64	val_64	1
+65	val_65	1
+66	val_66	1
+67	val_67	1
+69	val_69	1
+70	val_70	1
+72	val_72	1
+74	val_74	1
+76	val_76	1
+77	val_77	1
+78	val_78	1
+8	val_8	1
+80	val_80	1
+82	val_82	1
+83	val_83	1
+84	val_84	1
+85	val_85	1
+86	val_86	1
+87	val_87	1
+9	val_9	1
+90	val_90	1
+92	val_92	1
+95	val_95	1
+96	val_96	1
+97	val_97	1
+98	val_98	1
+tst1	500	1
+PREHOOK: query: FROM (
+      select key, value from src s0
+                         UNION all
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_5
+PREHOOK: Output: default@dest1_acid_6
+POSTHOOK: query: FROM (
+      select key, value from src s0
+                         UNION all
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_5
+POSTHOOK: Output: default@dest1_acid_6
+POSTHOOK: Lineage: dest1_acid_5.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_5.value EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.val1 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.val2 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_5
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_5
+#### A masked pattern was here ####
+0	1
+10	1
+100	1
+103	1
+104	1
+105	1
+11	1
+111	1
+113	1
+114	1
+116	1
+118	1
+119	1
+12	1
+120	1
+125	1
+126	1
+128	1
+129	1
+131	1
+133	1
+134	1
+136	1
+137	1
+138	1
+143	1
+145	1
+146	1
+149	1
+15	1
+150	1
+152	1
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	1
+165	1
+166	1
+167	1
+168	1
+169	1
+17	1
+170	1
+172	1
+174	1
+175	1
+176	1
+177	1
+178	1
+179	1
+18	1
+180	1
+181	1
+183	1
+186	1
+187	1
+189	1
+19	1
+190	1
+191	1
+192	1
+193	1
+194	1
+195	1
+196	1
+197	1
+199	1
+2	1
+20	1
+200	1
+201	1
+202	1
+203	1
+205	1
+207	1
+208	1
+209	1
+213	1
+214	1
+216	1
+217	1
+218	1
+219	1
+221	1
+222	1
+223	1
+224	1
+226	1
+228	1
+229	1
+230	1
+233	1
+235	1
+237	1
+238	1
+239	1
+24	1
+241	1
+242	1
+244	1
+247	1
+248	1
+249	1
+252	1
+255	1
+256	1
+257	1
+258	1
+26	1
+260	1
+262	1
+263	1
+265	1
+266	1
+27	1
+272	1
+273	1
+274	1
+275	1
+277	1
+278	1
+28	1
+280	1
+281	1
+282	1
+283	1
+284	1
+285	1
+286	1
+287	1
+288	1
+289	1
+291	1
+292	1
+296	1
+298	1
+30	1
+302	1
+305	1
+306	1
+307	1
+308	1
+309	1
+310	1
+311	1
+315	1
+316	1
+317	1
+318	1
+321	1
+322	1
+323	1
+325	1
+327	1
+33	1
+331	1
+332	1
+333	1
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	1
+344	1
+345	1
+348	1
+35	1
+351	1
+353	1
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	1
+368	1
+369	1
+37	1
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	1
+384	1
+386	1
+389	1
+392	1
+393	1
+394	1
+395	1
+396	1
+397	1
+399	1
+4	1
+400	1
+401	1
+402	1
+403	1
+404	1
+406	1
+407	1
+409	1
+41	1
+411	1
+413	1
+414	1
+417	1
+418	1
+419	1
+42	1
+421	1
+424	1
+427	1
+429	1
+43	1
+430	1
+431	1
+432	1
+435	1
+436	1
+437	1
+438	1
+439	1
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	1
+455	1
+457	1
+458	1
+459	1
+460	1
+462	1
+463	1
+466	1
+467	1
+468	1
+469	1
+47	1
+470	1
+472	1
+475	1
+477	1
+478	1
+479	1
+480	1
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	1
+490	1
+491	1
+492	1
+493	1
+494	1
+495	1
+496	1
+497	1
+498	1
+5	1
+51	1
+53	1
+54	1
+57	1
+58	1
+64	1
+65	1
+66	1
+67	1
+69	1
+70	1
+72	1
+74	1
+76	1
+77	1
+78	1
+8	1
+80	1
+82	1
+83	1
+84	1
+85	1
+86	1
+87	1
+9	1
+90	1
+92	1
+95	1
+96	1
+97	1
+98	1
+tst1	1
+PREHOOK: query: select * from DEST1_acid_6
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_6
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_6
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_6
+#### A masked pattern was here ####
+0	val_0	1
+10	val_10	1
+100	val_100	1
+103	val_103	1
+104	val_104	1
+105	val_105	1
+11	val_11	1
+111	val_111	1
+113	val_113	1
+114	val_114	1
+116	val_116	1
+118	val_118	1
+119	val_119	1
+12	val_12	1
+120	val_120	1
+125	val_125	1
+126	val_126	1
+128	val_128	1
+129	val_129	1
+131	val_131	1
+133	val_133	1
+134	val_134	1
+136	val_136	1
+137	val_137	1
+138	val_138	1
+143	val_143	1
+145	val_145	1
+146	val_146	1
+149	val_149	1
+15	val_15	1
+150	val_150	1
+152	val_152	1
+153	val_153	1
+155	val_155	1
+156	val_156	1
+157	val_157	1
+158	val_158	1
+160	val_160	1
+162	val_162	1
+163	val_163	1
+164	val_164	1
+165	val_165	1
+166	val_166	1
+167	val_167	1
+168	val_168	1
+169	val_169	1
+17	val_17	1
+170	val_170	1
+172	val_172	1
+174	val_174	1
+175	val_175	1
+176	val_176	1
+177	val_177	1
+178	val_178	1
+179	val_179	1
+18	val_18	1
+180	val_180	1
+181	val_181	1
+183	val_183	1
+186	val_186	1
+187	val_187	1
+189	val_189	1
+19	val_19	1
+190	val_190	1
+191	val_191	1
+192	val_192	1
+193	val_193	1
+194	val_194	1
+195	val_195	1
+196	val_196	1
+197	val_197	1
+199	val_199	1
+2	val_2	1
+20	val_20	1
+200	val_200	1
+201	val_201	1
+202	val_202	1
+203	val_203	1
+205	val_205	1
+207	val_207	1
+208	val_208	1
+209	val_209	1
+213	val_213	1
+214	val_214	1
+216	val_216	1
+217	val_217	1
+218	val_218	1
+219	val_219	1
+221	val_221	1
+222	val_222	1
+223	val_223	1
+224	val_224	1
+226	val_226	1
+228	val_228	1
+229	val_229	1
+230	val_230	1
+233	val_233	1
+235	val_235	1
+237	val_237	1
+238	val_238	1
+239	val_239	1
+24	val_24	1
+241	val_241	1
+242	val_242	1
+244	val_244	1
+247	val_247	1
+248	val_248	1
+249	val_249	1
+252	val_252	1
+255	val_255	1
+256	val_256	1
+257	val_257	1
+258	val_258	1
+26	val_26	1
+260	val_260	1
+262	val_262	1
+263	val_263	1
+265	val_265	1
+266	val_266	1
+27	val_27	1
+272	val_272	1
+273	val_273	1
+274	val_274	1
+275	val_275	1
+277	val_277	1
+278	val_278	1
+28	val_28	1
+280	val_280	1
+281	val_281	1
+282	val_282	1
+283	val_283	1
+284	val_284	1
+285	val_285	1
+286	val_286	1
+287	val_287	1
+288	val_288	1
+289	val_289	1
+291	val_291	1
+292	val_292	1
+296	val_296	1
+298	val_298	1
+30	val_30	1
+302	val_302	1
+305	val_305	1
+306	val_306	1
+307	val_307	1
+308	val_308	1
+309	val_309	1
+310	val_310	1
+311	val_311	1
+315	val_315	1
+316	val_316	1
+317	val_317	1
+318	val_318	1
+321	val_321	1
+322	val_322	1
+323	val_323	1
+325	val_325	1
+327	val_327	1
+33	val_33	1
+331	val_331	1
+332	val_332	1
+333	val_333	1
+335	val_335	1
+336	val_336	1
+338	val_338	1
+339	val_339	1
+34	val_34	1
+341	val_341	1
+342	val_342	1
+344	val_344	1
+345	val_345	1
+348	val_348	1
+35	val_35	1
+351	val_351	1
+353	val_353	1
+356	val_356	1
+360	val_360	1
+362	val_362	1
+364	val_364	1
+365	val_365	1
+366	val_366	1
+367	val_367	1
+368	val_368	1
+369	val_369	1
+37	val_37	1
+373	val_373	1
+374	val_374	1
+375	val_375	1
+377	val_377	1
+378	val_378	1
+379	val_379	1
+382	val_382	1
+384	val_384	1
+386	val_386	1
+389	val_389	1
+392	val_392	1
+393	val_393	1
+394	val_394	1
+395	val_395	1
+396	val_396	1
+397	val_397	1
+399	val_399	1
+4	val_4	1
+400	val_400	1
+401	val_401	1
+402	val_402	1
+403	val_403	1
+404	val_404	1
+406	val_406	1
+407	val_407	1
+409	val_409	1
+41	val_41	1
+411	val_411	1
+413	val_413	1
+414	val_414	1
+417	val_417	1
+418	val_418	1
+419	val_419	1
+42	val_42	1
+421	val_421	1
+424	val_424	1
+427	val_427	1
+429	val_429	1
+43	val_43	1
+430	val_430	1
+431	val_431	1
+432	val_432	1
+435	val_435	1
+436	val_436	1
+437	val_437	1
+438	val_438	1
+439	val_439	1
+44	val_44	1
+443	val_443	1
+444	val_444	1
+446	val_446	1
+448	val_448	1
+449	val_449	1
+452	val_452	1
+453	val_453	1
+454	val_454	1
+455	val_455	1
+457	val_457	1
+458	val_458	1
+459	val_459	1
+460	val_460	1
+462	val_462	1
+463	val_463	1
+466	val_466	1
+467	val_467	1
+468	val_468	1
+469	val_469	1
+47	val_47	1
+470	val_470	1
+472	val_472	1
+475	val_475	1
+477	val_477	1
+478	val_478	1
+479	val_479	1
+480	val_480	1
+481	val_481	1
+482	val_482	1
+483	val_483	1
+484	val_484	1
+485	val_485	1
+487	val_487	1
+489	val_489	1
+490	val_490	1
+491	val_491	1
+492	val_492	1
+493	val_493	1
+494	val_494	1
+495	val_495	1
+496	val_496	1
+497	val_497	1
+498	val_498	1
+5	val_5	1
+51	val_51	1
+53	val_53	1
+54	val_54	1
+57	val_57	1
+58	val_58	1
+64	val_64	1
+65	val_65	1
+66	val_66	1
+67	val_67	1
+69	val_69	1
+70	val_70	1
+72	val_72	1
+74	val_74	1
+76	val_76	1
+77	val_77	1
+78	val_78	1
+8	val_8	1
+80	val_80	1
+82	val_82	1
+83	val_83	1
+84	val_84	1
+85	val_85	1
+86	val_86	1
+87	val_87	1
+9	val_9	1
+90	val_90	1
+92	val_92	1
+95	val_95	1
+96	val_96	1
+97	val_97	1
+98	val_98	1
+tst1	500	1
+PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_7
+PREHOOK: Output: default@dest1_acid_8
+POSTHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_7
+POSTHOOK: Output: default@dest1_acid_8
+POSTHOOK: Lineage: dest1_acid_7.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_7.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_7
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_7
+#### A masked pattern was here ####
+0	1
+10	1
+100	1
+103	1
+104	1
+105	1
+11	1
+111	1
+113	1
+114	1
+116	1
+118	1
+119	1
+12	1
+120	1
+125	1
+126	1
+128	1
+129	1
+131	1
+133	1
+134	1
+136	1
+137	1
+138	1
+143	1
+145	1
+146	1
+149	1
+15	1
+150	1
+152	1
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	1
+165	1
+166	1
+167	1
+168	1
+169	1
+17	1
+170	1
+172	1
+174	1
+175	1
+176	1
+177	1
+178	1
+179	1
+18	1
+180	1
+181	1
+183	1
+186	1
+187	1
+189	1
+19	1
+190	1
+191	1
+192	1
+193	1
+194	1
+195	1
+196	1
+197	1
+199	1
+2	1
+20	1
+200	1
+201	1
+202	1
+203	1
+205	1
+207	1
+208	1
+209	1
+213	1
+214	1
+216	1
+217	1
+218	1
+219	1
+221	1
+222	1
+223	1
+224	1
+226	1
+228	1
+229	1
+230	1
+233	1
+235	1
+237	1
+238	1
+239	1
+24	1
+241	1
+242	1
+244	1
+247	1
+248	1
+249	1
+252	1
+255	1
+256	1
+257	1
+258	1
+26	1
+260	1
+262	1
+263	1
+265	1
+266	1
+27	1
+272	1
+273	1
+274	1
+275	1
+277	1
+278	1
+28	1
+280	1
+281	1
+282	1
+283	1
+284	1
+285	1
+286	1
+287	1
+288	1
+289	1
+291	1
+292	1
+296	1
+298	1
+30	1
+302	1
+305	1
+306	1
+307	1
+308	1
+309	1
+310	1
+311	1
+315	1
+316	1
+317	1
+318	1
+321	1
+322	1
+323	1
+325	1
+327	1
+33	1
+331	1
+332	1
+333	1
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	1
+344	1
+345	1
+348	1
+35	1
+351	1
+353	1
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	1
+368	1
+369	1
+37	1
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	1
+384	1
+386	1
+389	1
+392	1
+393	1
+394	1
+395	1
+396	1
+397	1
+399	1
+4	1
+400	1
+401	1
+402	1
+403	1
+404	1
+406	1
+407	1
+409	1
+41	1
+411	1
+413	1
+414	1
+417	1
+418	1
+419	1
+42	1
+421	1
+424	1
+427	1
+429	1
+43	1
+430	1
+431	1
+432	1
+435	1
+436	1
+437	1
+438	1
+439	1
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	1
+455	1
+457	1
+458	1
+459	1
+460	1
+462	1
+463	1
+466	1
+467	1
+468	1
+469	1
+47	1
+470	1
+472	1
+475	1
+477	1
+478	1
+479	1
+480	1
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	1
+490	1
+491	1
+492	1
+493	1
+494	1
+495	1
+496	1
+497	1
+498	1
+5	1
+51	1
+53	1
+54	1
+57	1
+58	1
+64	1
+65	1
+66	1
+67	1
+69	1
+70	1
+72	1
+74	1
+76	1
+77	1
+78	1
+8	1
+80	1
+82	1
+83	1
+84	1
+85	1
+86	1
+87	1
+9	1
+90	1
+92	1
+95	1
+96	1
+97	1
+98	1
+tst1	1
+PREHOOK: query: select * from DEST1_acid_8
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_8
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_8
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_8
+#### A masked pattern was here ####
+0	val_0	1
+10	val_10	1
+100	val_100	1
+103	val_103	1
+104	val_104	1
+105	val_105	1
+11	val_11	1
+111	val_111	1
+113	val_113	1
+114	val_114	1
+116	val_116	1
+118	val_118	1
+119	val_119	1
+12	val_12	1
+120	val_120	1
+125	val_125	1
+126	val_126	1
+128	val_128	1
+129	val_129	1
+131	val_131	1
+133	val_133	1
+134	val_134	1
+136	val_136	1
+137	val_137	1
+138	val_138	1
+143	val_143	1
+145	val_145	1
+146	val_146	1
+149	val_149	1
+15	val_15	1
+150	val_150	1
+152	val_152	1
+153	val_153	1
+155	val_155	1
+156	val_156	1
+157	val_157	1
+158	val_158	1
+160	val_160	1
+162	val_162	1
+163	val_163	1
+164	val_164	1
+165	val_165	1
+166	val_166	1
+167	val_167	1
+168	val_168	1
+169	val_169	1
+17	val_17	1
+170	val_170	1
+172	val_172	1
+174	val_174	1
+175	val_175	1
+176	val_176	1
+177	val_177	1
+178	val_178	1
+179	val_179	1
+18	val_18	1
+180	val_180	1
+181	val_181	1
+183	val_183	1
+186	val_186	1
+187	val_187	1
+189	val_189	1
+19	val_19	1
+190	val_190	1
+191	val_191	1
+192	val_192	1
+193	val_193	1
+194	val_194	1
+195	val_195	1
+196	val_196	1
+197	val_197	1
+199	val_199	1
+2	val_2	1
+20	val_20	1
+200	val_200	1
+201	val_201	1
+202	val_202	1
+203	val_203	1
+205	val_205	1
+207	val_207	1
+208	val_208	1
+209	val_209	1
+213	val_213	1
+214	val_214	1
+216	val_216	1
+217	val_217	1
+218	val_218	1
+219	val_219	1
+221	val_221	1
+222	val_222	1
+223	val_223	1
+224	val_224	1
+226	val_226	1
+228	val_228	1
+229	val_229	1
+230	val_230	1
+233	val_233	1
+235	val_235	1
+237	val_237	1
+238	val_238	1
+239	val_239	1
+24	val_24	1
+241	val_241	1
+242	val_242	1
+244	val_244	1
+247	val_247	1
+248	val_248	1
+249	val_249	1
+252	val_252	1
+255	val_255	1
+256	val_256	1
+257	val_257	1
+258	val_258	1
+26	val_26	1
+260	val_260	1
+262	val_262	1
+263	val_263	1
+265	val_265	1
+266	val_266	1
+27	val_27	1
+272	val_272	1
+273	val_273	1
+274	val_274	1
+275	val_275	1
+277	val_277	1
+278	val_278	1
+28	val_28	1
+280	val_280	1
+281	val_281	1
+282	val_282	1
+283	val_283	1
+284	val_284	1
+285	val_285	1
+286	val_286	1
+287	val_287	1
+288	val_288	1
+289	val_289	1
+291	val_291	1
+292	val_292	1
+296	val_296	1
+298	val_298	1
+30	val_30	1
+302	val_302	1
+305	val_305	1
+306	val_306	1
+307	val_307	1
+308	val_308	1
+309	val_309	1
+310	val_310	1
+311	val_311	1
+315	val_315	1
+316	val_316	1
+317	val_317	1
+318	val_318	1
+321	val_321	1
+322	val_322	1
+323	val_323	1
+325	val_325	1
+327	val_327	1
+33	val_33	1
+331	val_331	1
+332	val_332	1
+333	val_333	1
+335	val_335	1
+336	val_336	1
+338	val_338	1
+339	val_339	1
+34	val_34	1
+341	val_341	1
+342	val_342	1
+344	val_344	1
+345	val_345	1
+348	val_348	1
+35	val_35	1
+351	val_351	1
+353	val_353	1
+356	val_356	1
+360	val_360	1
+362	val_362	1
+364	val_364	1
+365	val_365	1
+366	val_366	1
+367	val_367	1
+368	val_368	1
+369	val_369	1
+37	val_37	1
+373	val_373	1
+374	val_374	1
+375	val_375	1
+377	val_377	1
+378	val_378	1
+379	val_379	1
+382	val_382	1
+384	val_384	1
+386	val_386	1
+389	val_389	1
+392	val_392	1
+393	val_393	1
+394	val_394	1
+395	val_395	1
+396	val_396	1
+397	val_397	1
+399	val_399	1
+4	val_4	1
+400	val_400	1
+401	val_401	1
+402	val_402	1
+403	val_403	1
+404	val_404	1
+406	val_406	1
+407	val_407	1
+409	val_409	1
+41	val_41	1
+411	val_411	1
+413	val_413	1
+414	val_414	1
+417	val_417	1
+418	val_418	1
+419	val_419	1
+42	val_42	1
+421	val_421	1
+424	val_424	1
+427	val_427	1
+429	val_429	1
+43	val_43	1
+430	val_430	1
+431	val_431	1
+432	val_432	1
+435	val_435	1
+436	val_436	1
+437	val_437	1
+438	val_438	1
+439	val_439	1
+44	val_44	1
+443	val_443	1
+444	val_444	1
+446	val_446	1
+448	val_448	1
+449	val_449	1
+452	val_452	1
+453	val_453	1
+454	val_454	1
+455	val_455	1
+457	val_457	1
+458	val_458	1
+459	val_459	1
+460	val_460	1
+462	val_462	1
+463	val_463	1
+466	val_466	1
+467	val_467	1
+468	val_468	1
+469	val_469	1
+47	val_47	1
+470	val_470	1
+472	val_472	1
+475	val_475	1
+477	val_477	1
+478	val_478	1
+479	val_479	1
+480	val_480	1
+481	val_481	1
+482	val_482	1
+483	val_483	1
+484	val_484	1
+485	val_485	1
+487	val_487	1
+489	val_489	1
+490	val_490	1
+491	val_491	1
+492	val_492	1
+493	val_493	1
+494	val_494	1
+495	val_495	1
+496	val_496	1
+497	val_497	1
+498	val_498	1
+5	val_5	1
+51	val_51	1
+53	val_53	1
+54	val_54	1
+57	val_57	1
+58	val_58	1
+64	val_64	1
+65	val_65	1
+66	val_66	1
+67	val_67	1
+69	val_69	1
+70	val_70	1
+72	val_72	1
+74	val_74	1
+76	val_76	1
+77	val_77	1
+78	val_78	1
+8	val_8	1
+80	val_80	1
+82	val_82	1
+83	val_83	1
+84	val_84	1
+85	val_85	1
+86	val_86	1
+87	val_87	1
+9	val_9	1
+90	val_90	1
+92	val_92	1
+95	val_95	1
+96	val_96	1
+97	val_97	1
+98	val_98	1
+tst1	500	1
+PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION distinct 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_10
+PREHOOK: Output: default@dest1_acid_9
+POSTHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION distinct 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_10
+POSTHOOK: Output: default@dest1_acid_9
+POSTHOOK: Lineage: dest1_acid_10.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_10.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_10.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_9.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_9.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_9
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_9
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_9
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_9
+#### A masked pattern was here ####
+0	1
+10	1
+100	1
+103	1
+104	1
+105	1
+11	1
+111	1
+113	1
+114	1
+116	1
+118	1
+119	1
+12	1
+120	1
+125	1
+126	1
+128	1
+129	1
+131	1
+133	1
+134	1
+136	1
+137	1
+138	1
+143	1
+145	1
+146	1
+149	1
+15	1
+150	1
+152	1
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	1
+165	1
+166	1
+167	1
+168	1
+169	1
+17	1
+170	1
+172	1
+174	1
+175	1
+176	1
+177	1
+178	1
+179	1
+18	1
+180	1
+181	1
+183	1
+186	1
+187	1
+189	1
+19	1
+190	1
+191	1
+192	1
+193	1
+194	1
+195	1
+196	1
+197	1
+199	1
+2	1
+20	1
+200	1
+201	1
+202	1
+203	1
+205	1
+207	1
+208	1
+209	1
+213	1
+214	1
+216	1
+217	1
+218	1
+219	1
+221	1
+222	1
+223	1
+224	1
+226	1
+228	1
+229	1
+230	1
+233	1
+235	1
+237	1
+238	1
+239	1
+24	1
+241	1
+242	1
+244	1
+247	1
+248	1
+249	1
+252	1
+255	1
+256	1
+257	1
+258	1
+26	1
+260	1
+262	1
+263	1
+265	1
+266	1
+27	1
+272	1
+273	1
+274	1
+275	1
+277	1
+278	1
+28	1
+280	1
+281	1
+282	1
+283	1
+284	1
+285	1
+286	1
+287	1
+288	1
+289	1
+291	1
+292	1
+296	1
+298	1
+30	1
+302	1
+305	1
+306	1
+307	1
+308	1
+309	1
+310	1
+311	1
+315	1
+316	1
+317	1
+318	1
+321	1
+322	1
+323	1
+325	1
+327	1
+33	1
+331	1
+332	1
+333	1
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	1
+344	1
+345	1
+348	1
+35	1
+351	1
+353	1
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	1
+368	1
+369	1
+37	1
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	1
+384	1
+386	1
+389	1
+392	1
+393	1
+394	1
+395	1
+396	1
+397	1
+399	1
+4	1
+400	1
+401	1
+402	1
+403	1
+404	1
+406	1
+407	1
+409	1
+41	1
+411	1
+413	1
+414	1
+417	1
+418	1
+419	1
+42	1
+421	1
+424	1
+427	1
+429	1
+43	1
+430	1
+431	1
+432	1
+435	1
+436	1
+437	1
+438	1
+439	1
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	1
+455	1
+457	1
+458	1
+459	1
+460	1
+462	1
+463	1
+466	1
+467	1
+468	1
+469	1
+47	1
+470	1
+472	1
+475	1
+477	1
+478	1
+479	1
+480	1
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	1
+490	1
+491	1
+492	1
+493	1
+494	1
+495	1
+496	1
+497	1
+498	1
+5	1
+51	1
+53	1
+54	1
+57	1
+58	1
+64	1
+65	1
+66	1
+67	1
+69	1
+70	1
+72	1
+74	1
+76	1
+77	1
+78	1
+8	1
+80	1
+82	1
+83	1
+84	1
+85	1
+86	1
+87	1
+9	1
+90	1
+92	1
+95	1
+96	1
+97	1
+98	1
+tst1	1
+PREHOOK: query: select * from DEST1_acid_10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_10
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_10
+#### A masked pattern was here ####
+0	val_0	1
+10	val_10	1
+100	val_100	1
+103	val_103	1
+104	val_104	1
+105	val_105	1
+11	val_11	1
+111	val_111	1
+113	val_113	1
+114	val_114	1
+116	val_116	1
+118	val_118	1
+119	val_119	1
+12	val_12	1
+120	val_120	1
+125	val_125	1
+126	val_126	1
+128	val_128	1
+129	val_129	1
+131	val_131	1
+133	val_133	1
+134	val_134	1
+136	val_136	1
+137	val_137	1
+138	val_138	1
+143	val_143	1
+145	val_145	1
+146	val_146	1
+149	val_149	1
+15	val_15	1
+150	val_150	1
+152	val_152	1
+153	val_153	1
+155	val_155	1
+156	val_156	1
+157	val_157	1
+158	val_158	1
+160	val_160	1
+162	val_162	1
+163	val_163	1
+164	val_164	1
+165	val_165	1
+166	val_166	1
+167	val_167	1
+168	val_168	1
+169	val_169	1
+17	val_17	1
+170	val_170	1
+172	val_172	1
+174	val_174	1
+175	val_175	1
+176	val_176	1
+177	val_177	1
+178	val_178	1
+179	val_179	1
+18	val_18	1
+180	val_180	1
+181	val_181	1
+183	val_183	1
+186	val_186	1
+187	val_187	1
+189	val_189	1
+19	val_19	1
+190	val_190	1
+191	val_191	1
+192	val_192	1
+193	val_193	1
+194	val_194	1
+195	val_195	1
+196	val_196	1
+197	val_197	1
+199	val_199	1
+2	val_2	1
+20	val_20	1
+200	val_200	1
+201	val_201	1
+202	val_202	1
+203	val_203	1
+205	val_205	1
+207	val_207	1
+208	val_208	1
+209	val_209	1
+213	val_213	1
+214	val_214	1
+216	val_216	1
+217	val_217	1
+218	val_218	1
+219	val_219	1
+221	val_221	1
+222	val_222	1
+223	val_223	1
+224	val_224	1
+226	val_226	1
+228	val_228	1
+229	val_229	1
+230	val_230	1
+233	val_233	1
+235	val_235	1
+237	val_237	1
+238	val_238	1
+239	val_239	1
+24	val_24	1
+241	val_241	1
+242	val_242	1
+244	val_244	1
+247	val_247	1
+248	val_248	1
+249	val_249	1
+252	val_252	1
+255	val_255	1
+256	val_256	1
+257	val_257	1
+258	val_258	1
+26	val_26	1
+260	val_260	1
+262	val_262	1
+263	val_263	1
+265	val_265	1
+266	val_266	1
+27	val_27	1
+272	val_272	1
+273	val_273	1
+274	val_274	1
+275	val_275	1
+277	val_277	1
+278	val_278	1
+28	val_28	1
+280	val_280	1
+281	val_281	1
+282	val_282	1
+283	val_283	1
+284	val_284	1
+285	val_285	1
+286	val_286	1
+287	val_287	1
+288	val_288	1
+289	val_289	1
+291	val_291	1
+292	val_292	1
+296	val_296	1
+298	val_298	1
+30	val_30	1
+302	val_302	1
+305	val_305	1
+306	val_306	1
+307	val_307	1
+308	val_308	1
+309	val_309	1
+310	val_310	1
+311	val_311	1
+315	val_315	1
+316	val_316	1
+317	val_317	1
+318	val_318	1
+321	val_321	1
+322	val_322	1
+323	val_323	1
+325	val_325	1
+327	val_327	1
+33	val_33	1
+331	val_331	1
+332	val_332	1
+333	val_333	1
+335	val_335	1
+336	val_336	1
+338	val_338	1
+339	val_339	1
+34	val_34	1
+341	val_341	1
+342	val_342	1
+344	val_344	1
+345	val_345	1
+348	val_348	1
+35	val_35	1
+351	val_351	1
+353	val_353	1
+356	val_356	1
+360	val_360	1
+362	val_362	1
+364	val_364	1
+365	val_365	1
+366	val_366	1
+367	val_367	1
+368	val_368	1
+369	val_369	1
+37	val_37	1
+373	val_373	1
+374	val_374	1
+375	val_375	1
+377	val_377	1
+378	val_378	1
+379	val_379	1
+382	val_382	1
+384	val_384	1
+386	val_386	1
+389	val_389	1
+392	val_392	1
+393	val_393	1
+394	val_394	1
+395	val_395	1
+396	val_396	1
+397	val_397	1
+399	val_399	1
+4	val_4	1
+400	val_400	1
+401	val_401	1
+402	val_402	1
+403	val_403	1
+404	val_404	1
+406	val_406	1
+407	val_407	1
+409	val_409	1
+41	val_41	1
+411	val_411	1
+413	val_413	1
+414	val_414	1
+417	val_417	1
+418	val_418	1
+419	val_419	1
+42	val_42	1
+421	val_421	1
+424	val_424	1
+427	val_427	1
+429	val_429	1
+43	val_43	1
+430	val_430	1
+431	val_431	1
+432	val_432	1
+435	val_435	1
+436	val_436	1
+437	val_437	1
+438	val_438	1
+439	val_439	1
+44	val_44	1
+443	val_443	1
+444	val_444	1
+446	val_446	1
+448	val_448	1
+449	val_449	1
+452	val_452	1
+453	val_453	1
+454	val_454	1
+455	val_455	1
+457	val_457	1
+458	val_458	1
+459	val_459	1
+460	val_460	1
+462	val_462	1
+463	val_463	1
+466	val_466	1
+467	val_467	1
+468	val_468	1
+469	val_469	1
+47	val_47	1
+470	val_470	1
+472	val_472	1
+475	val_475	1
+477	val_477	1
+478	val_478	1
+479	val_479	1
+480	val_480	1
+481	val_481	1
+482	val_482	1
+483	val_483	1
+484	val_484	1
+485	val_485	1
+487	val_487	1
+489	val_489	1
+490	val_490	1
+491	val_491	1
+492	val_492	1
+493	val_493	1
+494	val_494	1
+495	val_495	1
+496	val_496	1
+497	val_497	1
+498	val_498	1
+5	val_5	1
+51	val_51	1
+53	val_53	1
+54	val_54	1
+57	val_57	1
+58	val_58	1
+64	val_64	1
+65	val_65	1
+66	val_66	1
+67	val_67	1
+69	val_69	1
+70	val_70	1
+72	val_72	1
+74	val_74	1
+76	val_76	1
+77	val_77	1
+78	val_78	1
+8	val_8	1
+80	val_80	1
+82	val_82	1
+83	val_83	1
+84	val_84	1
+85	val_85	1
+86	val_86	1
+87	val_87	1
+9	val_9	1
+90	val_90	1
+92	val_92	1
+95	val_95	1
+96	val_96	1
+97	val_97	1
+98	val_98	1
+tst1	500	1
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_1
+PREHOOK: Output: default@dest1_acid_1
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_1
+POSTHOOK: Output: default@dest1_acid_1
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_2
+PREHOOK: Output: default@dest1_acid_2
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_2
+POSTHOOK: Output: default@dest1_acid_2
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_3
+PREHOOK: Output: default@dest1_acid_3
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_3
+POSTHOOK: Output: default@dest1_acid_3
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_4
+PREHOOK: Output: default@dest1_acid_4
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_4
+POSTHOOK: Output: default@dest1_acid_4
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_5
+PREHOOK: Output: default@dest1_acid_5
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_5
+POSTHOOK: Output: default@dest1_acid_5
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_6
+PREHOOK: Output: default@dest1_acid_6
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_6
+POSTHOOK: Output: default@dest1_acid_6
diff --git a/ql/src/test/results/clientpositive/mm_all.q.out b/ql/src/test/results/clientpositive/mm_all.q.out
index 143ebd6..16ba2f2 100644
--- a/ql/src/test/results/clientpositive/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/mm_all.q.out
@@ -1647,6 +1647,7 @@ POSTHOOK: Input: default@intermediate_n0@p=455
 POSTHOOK: Input: default@intermediate_n0@p=456
 POSTHOOK: Input: default@intermediate_n0@p=457
 POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
 POSTHOOK: Output: default@multi1_mm@p=455
 POSTHOOK: Output: default@multi1_mm@p=456
 POSTHOOK: Output: default@multi1_mm@p=457
@@ -1660,6 +1661,8 @@ POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
 PREHOOK: query: select key, key2, p from multi1_mm order by key, key2, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@multi1_mm
@@ -1727,6 +1730,18 @@ POSTHOOK: Input: default@intermediate_n0@p=455
 POSTHOOK: Input: default@intermediate_n0@p=456
 POSTHOOK: Input: default@intermediate_n0@p=457
 POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
+POSTHOOK: Output: default@multi1_mm@p=455
+POSTHOOK: Output: default@multi1_mm@p=456
+POSTHOOK: Output: default@multi1_mm@p=457
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
 POSTHOOK: Lineage: ###Masked###
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 35a220f..6101caa 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -391,7 +391,7 @@ public class TestStreaming {
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
@@ -569,7 +569,7 @@ public class TestStreaming {
         + "INPUT__FILE__NAME from default.writeidconnection order by a");
     Assert.assertEquals(4, rs.size());
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
     Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
@@ -727,7 +727,7 @@ public class TestStreaming {
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));