You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/02/23 16:31:15 UTC

[17/21] hive git commit: HIVE-18192: Introduce WriteID per table rather than using global transaction ID (Sankar Hariappan, reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 236e585..5c13781 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringableMap;
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -110,7 +110,7 @@ public class CompactorMR {
   }
 
   private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-                                    ValidTxnList txns, CompactionInfo ci) {
+                                    ValidWriteIdList writeIds, CompactionInfo ci) {
     JobConf job = new JobConf(conf);
     job.setJobName(jobName);
     job.setOutputKeyClass(NullWritable.class);
@@ -135,7 +135,7 @@ public class CompactorMR {
     job.setBoolean(IS_COMPRESSED, sd.isCompressed());
     job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
-    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
     overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable
     if (ci.properties != null) {
       overrideTblProps(job, t.getParameters(), ci.properties);
@@ -197,12 +197,12 @@ public class CompactorMR {
    * @param jobName name to run this job with
    * @param t metastore table
    * @param sd metastore storage descriptor
-   * @param txns list of valid transactions
+   * @param writeIds list of valid write ids
    * @param ci CompactionInfo
    * @throws java.io.IOException if the job fails
    */
-  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
+  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds,
+           CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -213,18 +213,18 @@ public class CompactorMR {
     if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
       LOG.debug("Going to delete directories for aborted transactions for MM table "
           + t.getDbName() + "." + t.getTableName());
-      removeFiles(conf, sd.getLocation(), txns, t);
+      removeFiles(conf, sd.getLocation(), writeIds, t);
       return;
     }
 
-    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
+    JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci);
 
     // Figure out and encode what files we need to read.  We do this here (rather than in
     // getSplits below) because as part of this we discover our minimum and maximum transactions,
     // and discovering that in getSplits is too late as we then have no way to pass it to our
     // mapper.
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns, false, true);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true);
     List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
     int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
     if(parsedDeltas.size() > maxDeltastoHandle) {
@@ -242,14 +242,14 @@ public class CompactorMR {
         "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
       int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
       for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
-        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci);
+        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci);
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
           maxDeltastoHandle, -1, conf, txnHandler, ci.id, jobName);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
-      dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+      dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
     }
 
     StringableList dirsToSearch = new StringableList();
@@ -280,8 +280,8 @@ public class CompactorMR {
     if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) {
       // Skip compaction if there's no delta files AND there's no original files
       String minOpenInfo = ".";
-      if(txns.getMinOpenTxn() != null) {
-        minOpenInfo = " with min Open " + JavaUtils.txnIdToString(txns.getMinOpenTxn()) +
+      if(writeIds.getMinOpenWriteId() != null) {
+        minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) +
           ".  Compaction cannot compact above this txnid";
       }
       LOG.error("No delta files or original files found to compact in " + sd.getLocation() +
@@ -316,8 +316,8 @@ public class CompactorMR {
       LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
       dirsToSearch.add(delta.getPath());
       deltaDirs.add(delta.getPath());
-      minTxn = Math.min(minTxn, delta.getMinTransaction());
-      maxTxn = Math.max(maxTxn, delta.getMaxTransaction());
+      minTxn = Math.min(minTxn, delta.getMinWriteId());
+      maxTxn = Math.max(maxTxn, delta.getMaxWriteId());
     }
 
     if (baseDir != null) job.set(BASE_DIR, baseDir.toString());
@@ -379,9 +379,9 @@ public class CompactorMR {
   }
 
   // Remove the directories for aborted transactions only
-  private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t)
+  private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t)
       throws IOException {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList,
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList,
         Ref.from(false), false, t.getParameters());
     // For MM table, we only want to delete delta dirs for aborted txns.
     List<FileStatus> abortedDirs = dir.getAbortedDirectories();
@@ -718,13 +718,13 @@ public class CompactorMR {
       @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
       AcidInputFormat<WritableComparable, V> aif =
           instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
-      ValidTxnList txnList =
-          new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+      ValidWriteIdList writeIdList =
+          new ValidCompactorWriteIdList(jobConf.get(ValidWriteIdList.VALID_WRITEIDS_KEY));
 
       boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =
           aif.getRawReader(jobConf, isMajor, split.getBucket(),
-              txnList, split.getBaseDir(), split.getDeltaDirs());
+                  writeIdList, split.getBaseDir(), split.getDeltaDirs());
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
@@ -779,8 +779,8 @@ public class CompactorMR {
             .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
             .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
             .reporter(reporter)
-            .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
-            .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+            .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+            .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
             .bucket(bucket)
             .statementId(-1);//setting statementId == -1 makes compacted delta files use
         //delta_xxxx_yyyy format
@@ -804,8 +804,8 @@ public class CompactorMR {
           .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
           .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
           .reporter(reporter)
-          .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
-          .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+          .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+          .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
           .bucket(bucket)
           .statementId(-1);//setting statementId == -1 makes compacted delta files use
         //delta_xxxx_yyyy format
@@ -926,8 +926,8 @@ public class CompactorMR {
         AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
             .writingBase(conf.getBoolean(IS_MAJOR, false))
             .isCompressed(conf.getBoolean(IS_COMPRESSED, false))
-            .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
-            .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
+            .minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
+            .maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
             .bucket(0)
             .statementId(-1);
         Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a52e023..7eda7fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -21,11 +21,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,8 +90,6 @@ public class Initiator extends CompactorThread {
           startedAt = System.currentTimeMillis();
           //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
-          ValidTxnList txns =
-              TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
           Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
@@ -143,12 +143,21 @@ public class Initiator extends CompactorThread {
                     ", assuming it has been dropped and moving on.");
                 continue;
               }
+
+              // Compaction doesn't work under a transaction and hence pass null for validTxnList
+              // The response will have one entry per table and hence we get only one ValidWriteIdList
+              String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+              GetValidWriteIdsRequest rqst
+                      = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null);
+              ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
+                      txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
+
               StorageDescriptor sd = resolveStorageDescriptor(t, p);
               String runAs = findUserToRunAs(sd.getLocation(), t);
               /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
               * Long term we should consider having a thread pool here and running checkForCompactionS
               * in parallel*/
-              CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs);
+              CompactionType compactionNeeded = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs);
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact " +
@@ -215,7 +224,7 @@ public class Initiator extends CompactorThread {
   }
 
   private CompactionType checkForCompaction(final CompactionInfo ci,
-                                            final ValidTxnList txns,
+                                            final ValidWriteIdList writeIds,
                                             final StorageDescriptor sd,
                                             final Map<String, String> tblproperties,
                                             final String runAs)
@@ -227,7 +236,7 @@ public class Initiator extends CompactorThread {
       return CompactionType.MAJOR;
     }
     if (runJobAsSelf(runAs)) {
-      return determineCompactionType(ci, txns, sd, tblproperties);
+      return determineCompactionType(ci, writeIds, sd, tblproperties);
     } else {
       LOG.info("Going to initiate as user " + runAs);
       UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
@@ -235,7 +244,7 @@ public class Initiator extends CompactorThread {
       CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
         @Override
         public CompactionType run() throws Exception {
-          return determineCompactionType(ci, txns, sd, tblproperties);
+          return determineCompactionType(ci, writeIds, sd, tblproperties);
         }
       });
       try {
@@ -248,7 +257,7 @@ public class Initiator extends CompactorThread {
     }
   }
 
-  private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
+  private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds,
                                                  StorageDescriptor sd, Map<String, String> tblproperties)
       throws IOException, InterruptedException {
 
@@ -259,7 +268,7 @@ public class Initiator extends CompactorThread {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
     FileSystem fs = location.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, writeIds, false, false);
     Path base = dir.getBaseDirectory();
     long baseSize = 0;
     FileStatus stat = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e5ebf9a..c47e78e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,16 +18,17 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Driver;
@@ -138,10 +139,15 @@ public class Worker extends CompactorThread {
         }
 
         final boolean isMajor = ci.isMajorCompaction();
-        final ValidTxnList txns =
-            TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
-        LOG.debug("ValidCompactTxnList: " + txns.writeToString());
-        txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
+
+        // Compaction doesn't work under a transaction and hence pass 0 for current txn Id
+        // The response will have one entry per table and hence we get only one OpenWriteIds
+        String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+        GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null);
+        final ValidWriteIdList tblValidWriteIds =
+                TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
+        LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+        txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark());
         final StringBuilder jobName = new StringBuilder(name);
         jobName.append("-compactor-");
         jobName.append(ci.getFullPartitionName());
@@ -164,14 +170,14 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
+            mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
+                mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 2663061..45890ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -56,7 +58,7 @@ import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 
 /**
- * Tests for TxnHandler.
+ * Tests for CompactionTxnHandler.
  */
 public class TestCompactionTxnHandler {
 
@@ -478,6 +480,13 @@ public class TestCompactionTxnHandler {
     String tableName = "adp_table";
     OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
     long txnId = openTxns.getTxn_ids().get(0);
+
+    AllocateTableWriteIdsResponse writeIds
+            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(openTxns.getTxn_ids(), dbName, tableName));
+    long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
+    assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
+    assertEquals(1, writeId);
+
     // lock a table, as in dynamic partitions
     LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
     lc.setIsDynamicPartitionWrite(true);
@@ -489,7 +498,7 @@ public class TestCompactionTxnHandler {
     LockResponse lock = txnHandler.lock(lr);
     assertEquals(LockState.ACQUIRED, lock.getState());
 
-    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeId, dbName, tableName,
       Arrays.asList("ds=yesterday", "ds=today"));
     adp.setOperationType(dop);
     txnHandler.addDynamicPartitions(adp);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 9eaf039..7b510dd 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -71,6 +73,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -149,7 +152,14 @@ public class TestTxnHandler {
     txnHandler.abortTxn(new AbortTxnRequest(1));
     List<String> parts = new ArrayList<String>();
     parts.add("p=1");
-    AddDynamicPartitions adp = new AddDynamicPartitions(3, "default", "T", parts);
+
+    AllocateTableWriteIdsResponse writeIds
+            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(3L), "default", "T"));
+    long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
+    assertEquals(3, writeIds.getTxnToWriteIds().get(0).getTxnId());
+    assertEquals(1, writeId);
+
+    AddDynamicPartitions adp = new AddDynamicPartitions(3, writeId, "default", "T", parts);
     adp.setOperationType(DataOperationType.INSERT);
     txnHandler.addDynamicPartitions(adp);
     GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 2a1545f..470856b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -760,14 +760,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     Assert.assertEquals(536936448,
       BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0),
+            rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1),
+            rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2),
+            rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(3),
+            rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000001_0000001_0000/bucket_00001"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
@@ -777,14 +781,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
       LOG.warn(s);
     }
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
+    Assert.assertTrue(rs.get(0),
+            rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000"));
+    Assert.assertTrue(rs.get(1),
+            rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
+    Assert.assertTrue(rs.get(2),
+            rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
+    Assert.assertTrue(rs.get(3),
+            rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
 
     //make sure they are the same before and after compaction
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index bab6d5e..2eead9e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -366,15 +366,15 @@ public class TestTxnCommands2 {
      * Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
      */
     String[][] expected = {
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13",  "bucket_00000"},
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
-      {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6",   "bucket_00001"},
-      {"{\"transactionid\":20,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
+        {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13",  "bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
+        {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2",   "bucket_00001"},
+        {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4",   "bucket_00001"},
+        {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5",   "bucket_00001"},
+        {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6",   "bucket_00001"},
+        {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
     };
     Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
     for(int i = 0; i < expected.length; i++) {
@@ -759,11 +759,11 @@ public class TestTxnCommands2 {
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
         Arrays.sort(buckets);
         if (numDelta == 1) {
-          Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName());
+          Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numDelta == 2) {
-          Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName());
+          Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
           Assert.assertEquals(1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
@@ -772,7 +772,7 @@ public class TestTxnCommands2 {
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
         Arrays.sort(buckets);
         if (numDeleteDelta == 1) {
-          Assert.assertEquals("delete_delta_0000024_0000024_0000", status[i].getPath().getName());
+          Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
@@ -819,7 +819,7 @@ public class TestTxnCommands2 {
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numBase == 2) {
           // The new base dir now has two bucket files, since the delta dir has two bucket files
-          Assert.assertEquals("base_0000025", status[i].getPath().getName());
+          Assert.assertEquals("base_0000002", status[i].getPath().getName());
           Assert.assertEquals(1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
@@ -845,7 +845,7 @@ public class TestTxnCommands2 {
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
     Assert.assertEquals(1, status.length);
-    Assert.assertEquals("base_0000025", status[0].getPath().getName());
+    Assert.assertEquals("base_0000002", status[0].getPath().getName());
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
     Arrays.sort(buckets);
     Assert.assertEquals(1, buckets.length);
@@ -861,7 +861,7 @@ public class TestTxnCommands2 {
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
     runStatementOnDriver("select * from " + Table.NONACIDORCTBL);
-    String value = hiveConf.get(ValidTxnList.VALID_TXNS_KEY);
+    String value = hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
     Assert.assertNull("The entry should be null for query that doesn't involve ACID tables", value);
   }
 
@@ -874,9 +874,9 @@ public class TestTxnCommands2 {
     //this will cause next txn to be marked aborted but the data is still written to disk
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
     runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2));
-    assert hiveConf.get(ValidTxnList.VALID_TXNS_KEY) == null : "previous txn should've cleaned it";
+    assert hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) == null : "previous txn should've cleaned it";
     //so now if HIVEFETCHTASKCONVERSION were to use a stale value, it would use a
-    //ValidTxnList with HWM=MAX_LONG, i.e. include the data for aborted txn
+    //ValidWriteIdList with HWM=MAX_LONG, i.e. include the data for aborted txn
     List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
     Assert.assertEquals("Extra data", 2, rs.size());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 3a3272f..0a305a4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -105,13 +105,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][]{
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}};
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}};
     checkResult(expected, testQuery, isVectorized, "load data inpath");
     runStatementOnDriver("update T set b = 17 where a = 1");
     String[][] expected2 = new String[][]{
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}
     };
     checkResult(expected2, testQuery, isVectorized, "update");
 
@@ -121,15 +121,15 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'minor'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected3 = new String[][] {
-      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"},
-      {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"}
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004/bucket_00000"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004/bucket_00000"}
     };
     checkResult(expected3, testQuery, isVectorized, "delete compact minor");
 
     runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T");
     String[][] expected4 = new String[][]{
-      {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"},
-      {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}};
+        {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"},
+        {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}};
     checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite");
 
     //load same data again (additive)
@@ -138,9 +138,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("delete from T where a = 3");//matches 2 rows
     runStatementOnDriver("insert into T values(2,2)");
     String[][] expected5 = new String[][]{
-      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
-      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
-      {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"}
+        {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
+        {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
+        {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"}
     };
     checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
 
@@ -148,9 +148,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected6 = new String[][]{
-      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"},
-      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"},
-      {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"}
+        {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009/bucket_00000"},
+        {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009/bucket_00000"},
+        {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009/bucket_00000"}
     };
     checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
   }
@@ -173,22 +173,22 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
-      //normal insert
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"},
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"},
-      //Load Data
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}};
+        //normal insert
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+        //Load Data
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}};
     checkResult(expected, testQuery, isVectorized, "load data inpath");
 
     //test minor compaction
     runStatementOnDriver("alter table T compact 'minor'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected1 = new String[][] {
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"},
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"}
     };
     checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
 
@@ -197,11 +197,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected2 = new String[][] {
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"},
-      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"},
-      {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003/bucket_00000"}
     };
     checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
 
@@ -210,8 +210,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
     runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
     String[][] expected3 = new String[][] {
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"},
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}};
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"},
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}};
     checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite");
 
     //one more major compaction
@@ -219,9 +219,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected4 = new String[][] {
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"},
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"},
-      {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}};
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005/bucket_00000"},
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005/bucket_00000"},
+        {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005/bucket_00000"}};
     checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
   }
   /**
@@ -254,24 +254,23 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
 /*
 {"transactionid":0,"bucketid":536870912,"rowid":0}     0       2/000000_0
 {"transactionid":0,"bucketid":536870912,"rowid":1}     0       4/000000_0
-{"transactionid":24,"bucketid":536870912,"rowid":0}    4       4/delta_0000024_0000024_0000/000000_0
-{"transactionid":24,"bucketid":536870912,"rowid":1}    5       5/delta_0000024_0000024_0000/000000_0
+{"transactionid":1,"bucketid":536870912,"rowid":0}    4       4/delta_0000001_0000001_0000/000000_0
+{"transactionid":1,"bucketid":536870912,"rowid":1}    5       5/delta_0000001_0000001_0000/000000_0
 */
     String[][] expected = new String[][] {
-      //from pre-acid insert
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
-      //from Load Data into acid converted table
-      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"},
-      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"},
-      {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000024_0000024_0000/000001_0"},
-      {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000024_0000024_0000/000001_0"},
-      {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000024_0000024_0000/000002_0"},
-      {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000024_0000024_0000/000002_0"},
+        //from pre-acid insert
+        {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
+        {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
+        //from Load Data into acid converted table
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"},
+        {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"},
+        {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"},
+        {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"},
     };
     checkResult(expected, testQuery, isVectorized, "load data inpath");
 
-
     //create more staging data with copy_N files and do LD+Overwrite
     runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
     runStatementOnDriver("insert into Tstage values(8,8)");
@@ -279,9 +278,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
 
     String[][] expected2 = new String[][] {
-      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"},
-      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"},
-      {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000030/000001_0"}
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"}
     };
     checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
 
@@ -291,11 +290,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     TestTxnCommands2.runWorker(hiveConf);
 
     String[][] expected3 = new String[][] {
-      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"},
-      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"},
-      {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000033/bucket_00001"},
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"}
-
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"},
+        {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"}
     };
     checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
   }
@@ -326,12 +324,12 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
 
     List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
     String[][] expected = new String[][] {
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"},
-      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"},
-      {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"},
-      {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}};
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"},
+        {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"},
+        {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}};
     checkExpected(rs, expected, "load data inpath partitioned");
 
 
@@ -340,10 +338,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("truncate table Tstage");
     runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)");
     String[][] expected2 = new String[][] {
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"},
-      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}};
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"},
+        {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}};
     rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
     checkExpected(rs, expected2, "load data inpath partitioned overwrite");
   }
@@ -405,20 +403,20 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"},
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"},
+        {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"}
     };
     checkResult(expected, testQuery, isVectorized, "load data inpath");
 
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected2 = new String[][] {
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001/bucket_00000"}
     };
     checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
     //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
@@ -444,8 +442,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
       "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
     String[][] expected = new String[][] {
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
     };
     checkResult(expected, testQuery, isVectorized, "load data inpath");
   }