You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/02/23 16:31:19 UTC

[21/21] hive git commit: HIVE-18192: Introduce WriteID per table rather than using global transaction ID (Sankar Hariappan, reviewed by Eugene Koifman)

HIVE-18192: Introduce WriteID per table rather than using global transaction ID (Sankar Hariappan, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbb9233a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbb9233a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbb9233a

Branch: refs/heads/master
Commit: cbb9233a3b39ab8489d777fc76f0758c49b69bef
Parents: f9768af
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri Feb 23 22:00:23 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri Feb 23 22:00:23 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/JavaUtils.java    |   11 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    2 +-
 .../streaming/AbstractRecordWriter.java         |   24 +-
 .../streaming/DelimitedInputWriter.java         |    8 +-
 .../hive/hcatalog/streaming/HiveEndPoint.java   |   77 +-
 .../hive/hcatalog/streaming/RecordWriter.java   |   12 +-
 .../hcatalog/streaming/StrictJsonWriter.java    |    8 +-
 .../hcatalog/streaming/StrictRegexWriter.java   |    8 +-
 .../hcatalog/streaming/TransactionBatch.java    |   47 +-
 .../streaming/mutate/client/AcidTable.java      |   14 +-
 .../mutate/client/AcidTableSerializer.java      |   10 +-
 .../streaming/mutate/client/MutatorClient.java  |   17 +-
 .../mutate/worker/MutatorCoordinator.java       |   22 +-
 .../streaming/mutate/worker/MutatorFactory.java |    3 +-
 .../streaming/mutate/worker/MutatorImpl.java    |   18 +-
 .../mutate/worker/SequenceValidator.java        |   14 +-
 .../hive/hcatalog/streaming/TestStreaming.java  |  109 +-
 .../mutate/ReflectiveMutatorFactory.java        |    4 +-
 .../streaming/mutate/StreamingAssert.java       |   16 +-
 .../mutate/client/TestAcidTableSerializer.java  |    6 +-
 .../mutate/client/TestMutatorClient.java        |   12 +-
 .../mutate/worker/TestMutatorCoordinator.java   |   26 +-
 .../mutate/worker/TestMutatorImpl.java          |   14 +-
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    |   64 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  106 +-
 .../upgrade/derby/044-HIVE-16997.derby.sql      |    1 -
 .../upgrade/derby/050-HIVE-18192.derby.sql      |   27 +
 .../derby/hive-txn-schema-3.0.0.derby.sql       |   27 +-
 .../derby/upgrade-2.3.0-to-3.0.0.derby.sql      |    1 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  107 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java |    4 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   12 +-
 .../hadoop/hive/ql/exec/FetchOperator.java      |   28 +-
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |    4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   18 +-
 .../hadoop/hive/ql/exec/ImportCommitTask.java   |    2 +-
 .../hadoop/hive/ql/exec/ImportCommitWork.java   |   10 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |    6 +-
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java |    1 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   |    8 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |    1 +
 .../hadoop/hive/ql/io/AcidInputFormat.java      |   34 +-
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |   30 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  234 +-
 .../hadoop/hive/ql/io/HiveFileFormatUtils.java  |    4 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   42 +-
 .../hadoop/hive/ql/io/RecordIdentifier.java     |   20 +-
 .../apache/hadoop/hive/ql/io/RecordUpdater.java |   13 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   41 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  |   14 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  126 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   92 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |  144 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   69 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   14 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   27 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   45 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |    8 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |    4 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   82 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |   14 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   59 +-
 .../hadoop/hive/ql/plan/FileMergeDesc.java      |   10 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |   14 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |   35 +-
 .../hadoop/hive/ql/plan/TableScanDesc.java      |   12 +-
 .../hadoop/hive/ql/stats/ColStatsProcessor.java |    1 -
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |   21 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |   54 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |   27 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   28 +-
 .../metastore/txn/TestCompactionTxnHandler.java |   13 +-
 .../hive/metastore/txn/TestTxnHandler.java      |   12 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |   34 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   36 +-
 .../apache/hadoop/hive/ql/TestTxnLoadData.java  |  148 +-
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  318 +-
 .../hive/ql/exec/TestFileSinkOperator.java      |   18 +-
 .../hadoop/hive/ql/io/TestAcidInputFormat.java  |   16 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |   73 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   26 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  129 +-
 .../hive/ql/io/orc/TestOrcRecordUpdater.java    |   16 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   14 +-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |  109 +-
 ...TestGenMapRedUtilsCreateConditionalTask.java |    2 +-
 .../parse/TestUpdateDeleteSemanticAnalyzer.java |    1 +
 .../hive/ql/txn/compactor/CompactorTest.java    |   39 +-
 .../hive/ql/txn/compactor/TestCleaner.java      |   30 +-
 .../hive/ql/txn/compactor/TestInitiator.java    |   50 +-
 .../hive/ql/txn/compactor/TestWorker.java       |   45 +-
 .../results/clientpositive/acid_nullscan.q.out  |    8 +-
 .../clientpositive/acid_table_stats.q.out       |   14 +-
 .../clientpositive/autoColumnStats_4.q.out      |    2 +-
 .../llap/acid_bucket_pruning.q.out              |    4 +-
 .../test/results/clientpositive/row__id.q.out   |   22 +-
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 3724 +++++++----
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  292 +
 .../ThriftHiveMetastore_server.skeleton.cpp     |   10 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3341 ++++++----
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |  307 +-
 .../metastore/api/AddDynamicPartitions.java     |  159 +-
 .../api/AllocateTableWriteIdsRequest.java       |  640 ++
 .../api/AllocateTableWriteIdsResponse.java      |  443 ++
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/CompactionRequest.java   |   44 +-
 .../hive/metastore/api/CreationMetadata.java    |   32 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../metastore/api/GetValidWriteIdsRequest.java  |  539 ++
 .../metastore/api/GetValidWriteIdsResponse.java |  443 ++
 .../api/HeartbeatTxnRangeResponse.java          |   64 +-
 .../metastore/api/InsertEventRequestData.java   |   64 +-
 .../hadoop/hive/metastore/api/LockRequest.java  |   36 +-
 .../hive/metastore/api/Materialization.java     |   32 +-
 .../api/NotificationEventResponse.java          |   36 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../hive/metastore/api/ShowCompactResponse.java |   36 +-
 .../hive/metastore/api/ShowLocksResponse.java   |   36 +-
 .../hive/metastore/api/TableValidWriteIds.java  |  851 +++
 .../hive/metastore/api/ThriftHiveMetastore.java | 6010 ++++++++++++------
 .../hadoop/hive/metastore/api/TxnToWriteId.java |  482 ++
 .../hive/metastore/api/WMFullResourcePlan.java  |  144 +-
 .../api/WMGetAllResourcePlanResponse.java       |   36 +-
 .../WMGetTriggersForResourePlanResponse.java    |   36 +-
 .../api/WMValidateResourcePlanResponse.java     |   64 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1949 ++++--
 .../src/gen/thrift/gen-php/metastore/Types.php  | 1469 ++++-
 .../hive_metastore/ThriftHiveMetastore-remote   |   14 +
 .../hive_metastore/ThriftHiveMetastore.py       | 1397 ++--
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  | 1023 ++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |  136 +-
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |  137 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   11 +
 .../hive/metastore/HiveMetaStoreClient.java     |   38 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   50 +-
 .../hive/metastore/txn/CompactionInfo.java      |   18 +-
 .../metastore/txn/CompactionTxnHandler.java     |   41 +-
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |   49 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  430 +-
 .../hadoop/hive/metastore/txn/TxnStore.java     |   33 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  129 +-
 .../main/sql/derby/hive-schema-3.0.0.derby.sql  |   27 +-
 .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql  |   26 +
 .../main/sql/mssql/hive-schema-3.0.0.mssql.sql  |   27 +-
 .../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql  |   26 +
 .../main/sql/mysql/hive-schema-3.0.0.mysql.sql  |   26 +-
 .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql  |   26 +
 .../sql/oracle/hive-schema-3.0.0.oracle.sql     |   27 +-
 .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql    |   26 +
 .../sql/postgres/hive-schema-3.0.0.postgres.sql |   27 +-
 .../upgrade-2.3.0-to-3.0.0.postgres.sql         |   26 +
 .../src/main/thrift/hive_metastore.thrift       |   50 +-
 .../hive/common/ValidCompactorTxnList.java      |   89 -
 .../hive/common/ValidCompactorWriteIdList.java  |   93 +
 .../hadoop/hive/common/ValidReadTxnList.java    |   12 +-
 .../hive/common/ValidReaderWriteIdList.java     |  254 +
 .../apache/hadoop/hive/common/ValidTxnList.java |    7 -
 .../hadoop/hive/common/ValidTxnWriteIdList.java |  101 +
 .../hadoop/hive/common/ValidWriteIdList.java    |  118 +
 .../hive/common/TestValidCompactorTxnList.java  |  134 -
 .../common/TestValidCompactorWriteIdList.java   |  142 +
 .../hive/common/TestValidReaderWriteIdList.java |  120 +
 170 files changed, 21424 insertions(+), 8611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index 57afbf8..de0c283 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -150,14 +150,15 @@ public final class JavaUtils {
   public static String lockIdToString(long extLockId) {
     return "lockid:" + extLockId;
   }
-  /**
-   * Utility method for ACID to normalize logging info.  Matches
-   * org.apache.hadoop.hive.metastore.api.LockResponse#toString
-   */
+
   public static String txnIdToString(long txnId) {
     return "txnid:" + txnId;
   }
 
+  public static String writeIdToString(long writeId) {
+    return "writeid:" + writeId;
+  }
+
   public static String txnIdsToString(List<Long> txnIds) {
     return "Transactions requested to be aborted: " + txnIds.toString();
   }
@@ -166,7 +167,7 @@ public final class JavaUtils {
     // prevent instantiation
   }
 
-  public static Long extractTxnId(Path file) {
+  public static Long extractWriteId(Path file) {
     String fileName = file.getName();
     String[] parts = fileName.split("_", 4);  // e.g. delta_0000001_0000001_0000 or base_0000022
     if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 169ddcb..0880a96 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1094,7 +1094,7 @@ public class HiveConf extends Configuration {
     HIVESCRIPTTRUNCATEENV("hive.script.operator.truncate.env", false,
         "Truncate each environment variable for external script in scripts operator to 20KB (to fit system limits)"),
     HIVESCRIPT_ENV_BLACKLIST("hive.script.operator.env.blacklist",
-        "hive.txn.valid.txns,hive.script.operator.env.blacklist",
+        "hive.txn.valid.txns,hive.txn.tables.valid.writeids,hive.txn.valid.writeids,hive.script.operator.env.blacklist",
         "Comma separated list of keys from the configuration file not to convert to environment " +
         "variables when invoking the script operator"),
     HIVE_STRICT_CHECKS_ORDERBY_NO_LIMIT("hive.strict.checks.orderby.no.limit", false,

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 4ec10ad..924e233 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -73,8 +73,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   private final AcidOutputFormat<?,?> outf;
   private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
-  private Long curBatchMinTxnId;
-  private Long curBatchMaxTxnId;
+  private Long curBatchMinWriteId;
+  private Long curBatchMaxWriteId;
 
   private static final class TableWriterPair {
     private final Table tbl;
@@ -143,7 +143,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
    * used to tag error msgs to provied some breadcrumbs
    */
   String getWatermark() {
-    return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]";
+    return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
   }
   // return the column numbers of the bucketed columns
   private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
@@ -207,15 +207,15 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   /**
    * Creates a new record updater for the new batch
-   * @param minTxnId smallest Txnid in the batch
-   * @param maxTxnID largest Txnid in the batch
+   * @param minWriteId smallest writeid in the batch
+   * @param maxWriteID largest writeid in the batch
    * @throws StreamingIOFailure if failed to create record updater
    */
   @Override
-  public void newBatch(Long minTxnId, Long maxTxnID)
+  public void newBatch(Long minWriteId, Long maxWriteID)
           throws StreamingIOFailure, SerializationError {
-    curBatchMinTxnId = minTxnId;
-    curBatchMaxTxnId = maxTxnID;
+    curBatchMinWriteId = minWriteId;
+    curBatchMaxWriteId = maxWriteID;
     updaters = new ArrayList<RecordUpdater>(totalBuckets);
     for (int bucket = 0; bucket < totalBuckets; bucket++) {
       updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
@@ -265,7 +265,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return bucketFieldData;
   }
 
-  private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+  private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID)
           throws IOException, SerializationError {
     try {
       // Initialize table properties from the table parameters. This is required because the table
@@ -278,8 +278,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
                       .inspector(getSerde().getObjectInspector())
                       .bucket(bucketId)
                       .tableProperties(tblProperties)
-                      .minimumTransactionId(minTxnId)
-                      .maximumTransactionId(maxTxnID)
+                      .minimumWriteId(minWriteId)
+                      .maximumWriteId(maxWriteID)
                       .statementId(-1)
                       .finalDestination(partitionPath));
     } catch (SerDeException e) {
@@ -292,7 +292,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     RecordUpdater recordUpdater = updaters.get(bucketId);
     if (recordUpdater == null) {
       try {
-        recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId);
+        recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId);
       } catch (IOException e) {
         String errMsg = "Failed creating RecordUpdater for " + getWatermark();
         LOG.error(errMsg, e);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 0a5492c..999c37e 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -255,16 +255,16 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
   }
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws SerializationError, StreamingIOFailure {
     try {
       byte[] orderedFields = reorderFields(record);
       Object encodedRow = encode(orderedFields);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction ("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id ("
+              + writeId + ")", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 6793d09..90731dc 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -551,7 +552,7 @@ public class HiveEndPoint {
     private final IMetaStoreClient msClient;
     private final IMetaStoreClient heartbeaterMSClient;
     private final RecordWriter recordWriter;
-    private final List<Long> txnIds;
+    private final List<TxnToWriteId> txnToWriteIds;
 
     //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
     private volatile int currentTxnIndex = -1;
@@ -602,14 +603,19 @@ public class HiveEndPoint {
         this.recordWriter = recordWriter;
         this.agentInfo = agentInfo;
 
-        txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
+        assert(txnToWriteIds.size() == numTxns);
+
         txnStatus = new TxnState[numTxns];
         for(int i = 0; i < txnStatus.length; i++) {
+          assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
           txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
         }
-
         this.state = TxnState.INACTIVE;
-        recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+
+        // The Write Ids returned for the transaction batch is also sequential
+        recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
         success = true;
       } catch (TException e) {
         throw new TransactionBatchUnAvailable(endPt, e);
@@ -632,12 +638,26 @@ public class HiveEndPoint {
         public Object run() throws Exception {
           return msClient.openTxns(user, numTxns).getTxn_ids();
         }
-      }) ;
+      });
+    }
+
+    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+                                                    final List<Long> txnIds, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+      }
+      return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+        }
+      });
     }
 
     @Override
     public String toString() {
-      if (txnIds==null || txnIds.isEmpty()) {
+      if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
         return "{}";
       }
       StringBuilder sb = new StringBuilder(" TxnStatus[");
@@ -646,7 +666,11 @@ public class HiveEndPoint {
         sb.append(state == null ? "N" : state);
       }
       sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
-      return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1)
+      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+              + "/" + txnToWriteIds.get(0).getWriteId()
+              + "..."
+              + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
+              + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
               + "] on endPoint = " + endPt + "; " + sb;
     }
 
@@ -680,7 +704,8 @@ public class HiveEndPoint {
 
     private void beginNextTransactionImpl() throws TransactionError {
       state = TxnState.INACTIVE;//clear state from previous txn
-      if ( currentTxnIndex + 1 >= txnIds.size() ) {
+
+      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
         throw new InvalidTrasactionState("No more transactions available in" +
                 " current batch for end point : " + endPt);
       }
@@ -699,13 +724,25 @@ public class HiveEndPoint {
     }
 
     /**
-     * Get Id of currently open transaction
+     * Get Id of currently open transaction.
      * @return -1 if there is no open TX
      */
     @Override
     public Long getCurrentTxnId() {
-      if(currentTxnIndex >= 0) {
-        return txnIds.get(currentTxnIndex);
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getTxnId();
+      }
+      return -1L;
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentWriteId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getWriteId();
       }
       return -1L;
     }
@@ -727,9 +764,9 @@ public class HiveEndPoint {
     @Override
     public int remainingTransactions() {
       if (currentTxnIndex>=0) {
-        return txnIds.size() - currentTxnIndex -1;
+        return txnToWriteIds.size() - currentTxnIndex -1;
       }
-      return txnIds.size();
+      return txnToWriteIds.size();
     }
 
 
@@ -824,7 +861,7 @@ public class HiveEndPoint {
     private void writeImpl(Collection<byte[]> records)
             throws StreamingException {
       for (byte[] record : records) {
-        recordWriter.write(getCurrentTxnId(), record);
+        recordWriter.write(getCurrentWriteId(), record);
       }
     }
 
@@ -869,7 +906,7 @@ public class HiveEndPoint {
     private void commitImpl() throws TransactionError, StreamingException {
       try {
         recordWriter.flush();
-        msClient.commitTxn(txnIds.get(currentTxnIndex));
+        msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
         state = TxnState.COMMITTED;
         txnStatus[currentTxnIndex] = TxnState.COMMITTED;
       } catch (NoSuchTxnException e) {
@@ -932,8 +969,8 @@ public class HiveEndPoint {
           int minOpenTxnIndex = Math.max(currentTxnIndex +
             (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
           for(currentTxnIndex = minOpenTxnIndex;
-              currentTxnIndex < txnIds.size(); currentTxnIndex++) {
-            msClient.rollbackTxn(txnIds.get(currentTxnIndex));
+              currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+            msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
             txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
           currentTxnIndex--;//since the loop left it == txnId.size()
@@ -960,15 +997,15 @@ public class HiveEndPoint {
       if(isClosed) {
         return;
       }
-      if(state != TxnState.OPEN && currentTxnIndex >= txnIds.size() - 1) {
+      if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
         //here means last txn in the batch is resolved but the close() hasn't been called yet so
         //there is nothing to heartbeat
         return;
       }
       //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
       //points at the last txn which we don't want to heartbeat
-      Long first = txnIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1);
-      Long last = txnIds.get(txnIds.size()-1);
+      Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
+      Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
       try {
         HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
         if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
index cddb8de..a9bcd9f 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
@@ -23,21 +23,21 @@ public interface RecordWriter {
 
   /** Writes using a hive RecordUpdater
    *
-   * @param transactionId the ID of the Txn in which the write occurs
+   * @param writeId the write ID of the table mapping to Txn in which the write occurs
    * @param record the record to be written
    */
-  public void write(long transactionId, byte[] record) throws StreamingException;
+  void write(long writeId, byte[] record) throws StreamingException;
 
   /** Flush records from buffer. Invoked by TransactionBatch.commit() */
-  public void flush() throws StreamingException;
+  void flush() throws StreamingException;
 
   /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
-  public void clear() throws StreamingException;
+  void clear() throws StreamingException;
 
   /** Acquire a new RecordUpdater. Invoked when
    * StreamingConnection.fetchTransactionBatch() is called */
-  public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException;
+  void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
 
   /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
-  public void closeBatch() throws StreamingException;
+  void closeBatch() throws StreamingException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 357c537..4d92c55 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -117,15 +117,15 @@ public class StrictJsonWriter extends AbstractRecordWriter {
 
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws StreamingIOFailure, SerializationError {
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+              + writeId + ")", e);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
index 58db252..ae25662 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
@@ -124,15 +124,15 @@ public class StrictRegexWriter extends AbstractRecordWriter {
 
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws StreamingIOFailure, SerializationError {
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+              + writeId + ")", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
index e5ad475..1208377 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
@@ -46,73 +46,80 @@ public interface TransactionBatch  {
   }
 
   /**
-   * Activate the next available transaction in the current transaction batch
+   * Activate the next available transaction in the current transaction batch.
    * @throws StreamingException if not able to switch to next Txn
    * @throws InterruptedException if call in interrupted
    */
-  public void beginNextTransaction() throws StreamingException, InterruptedException;
+  void beginNextTransaction() throws StreamingException, InterruptedException;
 
   /**
-   * Get Id of currently open transaction
+   * Get Id of currently open transaction.
    * @return transaction id
    */
-  public Long getCurrentTxnId();
+  Long getCurrentTxnId();
+
+
+  /**
+   * Get write Id mapping to currently open transaction.
+   * @return write id
+   */
+  Long getCurrentWriteId();
 
   /**
-   * get state of current transaction
+   * get state of current transaction.
    */
-  public TxnState getCurrentTransactionState();
+  TxnState getCurrentTransactionState();
 
   /**
-   * Commit the currently open transaction
+   * Commit the currently open transaction.
    * @throws StreamingException if there are errors committing
    * @throws InterruptedException if call in interrupted
    */
-  public void commit() throws StreamingException, InterruptedException;
+  void commit() throws StreamingException, InterruptedException;
 
   /**
-   * Abort the currently open transaction
+   * Abort the currently open transaction.
    * @throws StreamingException if there are errors
    * @throws InterruptedException if call in interrupted
    */
-  public void abort() throws StreamingException, InterruptedException;
+  void abort() throws StreamingException, InterruptedException;
 
   /**
    * Remaining transactions are the ones that are not committed or aborted or open.
    * Current open transaction is not considered part of remaining txns.
    * @return number of transactions remaining this batch.
    */
-  public int remainingTransactions();
+  int remainingTransactions();
 
 
   /**
-   *  Write record using RecordWriter
+   *  Write record using RecordWriter.
    * @param record  the data to be written
    * @throws StreamingException if there are errors when writing
    * @throws InterruptedException if call in interrupted
    */
-  public void write(byte[] record) throws StreamingException, InterruptedException;
+  void write(byte[] record) throws StreamingException, InterruptedException;
 
   /**
-   *  Write records using RecordWriter
+   *  Write records using RecordWriter.
    * @throws StreamingException if there are errors when writing
    * @throws InterruptedException if call in interrupted
    */
-  public void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+  void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
 
 
   /**
    * Issues a heartbeat to hive metastore on the current and remaining txn ids
-   * to keep them from expiring
+   * to keep them from expiring.
    * @throws StreamingException if there are errors
    */
-  public void heartbeat() throws StreamingException;
+  void heartbeat() throws StreamingException;
 
   /**
-   * Close the TransactionBatch
+   * Close the TransactionBatch.
    * @throws StreamingException if there are errors closing batch
    * @throws InterruptedException if call in interrupted
    */
-  public void close() throws StreamingException, InterruptedException;
-  public boolean isClosed();
+  void close() throws StreamingException, InterruptedException;
+  boolean isClosed();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
index 5b371e3..50ba0c7 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
@@ -34,7 +34,7 @@ public class AcidTable implements Serializable {
   private final String tableName;
   private final boolean createPartitions;
   private final TableType tableType;
-  private long transactionId;
+  private long writeId;
 
   private Table table;
 
@@ -48,10 +48,10 @@ public class AcidTable implements Serializable {
   /**
    * Returns {@code 0} until such a time that a {@link Transaction} has been acquired (when
    * {@link MutatorClient#newTransaction()} exits), at which point this will return the
-   * {@link Transaction#getTransactionId() transaction id}.
+   * write id.
    */
-  public long getTransactionId() {
-    return transactionId;
+  public long getWriteId() {
+    return writeId;
   }
 
   public String getDatabaseName() {
@@ -105,8 +105,8 @@ public class AcidTable implements Serializable {
     return table;
   }
 
-  void setTransactionId(long transactionId) {
-    this.transactionId = transactionId;
+  void setWriteId(long writeId) {
+    this.writeId = writeId;
   }
 
   void setTable(Table table) {
@@ -123,7 +123,7 @@ public class AcidTable implements Serializable {
   public String toString() {
     return "AcidTable [databaseName=" + databaseName + ", tableName=" + tableName + ", createPartitions="
         + createPartitions + ", tableType=" + tableType + ", outputFormatName=" + getOutputFormatName()
-        + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + transactionId + "]";
+        + ", totalBuckets=" + getTotalBuckets() + ", writeId=" + writeId + "]";
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
index 32db5e3..98f9f40 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
@@ -54,10 +54,10 @@ public class AcidTableSerializer {
       data.writeUTF(table.getDatabaseName());
       data.writeUTF(table.getTableName());
       data.writeBoolean(table.createPartitions());
-      if (table.getTransactionId() <= 0) {
-        LOG.warn("Transaction ID <= 0. The recipient is probably expecting a transaction ID.");
+      if (table.getWriteId() <= 0) {
+        LOG.warn("Write ID <= 0. The recipient is probably expecting a table write ID.");
       }
-      data.writeLong(table.getTransactionId());
+      data.writeLong(table.getWriteId());
       data.writeByte(table.getTableType().getId());
 
       Table metaTable = table.getTable();
@@ -91,12 +91,12 @@ public class AcidTableSerializer {
       String databaseName = in.readUTF();
       String tableName = in.readUTF();
       boolean createPartitions = in.readBoolean();
-      long transactionId = in.readLong();
+      long writeId = in.readLong();
       TableType tableType = TableType.valueOf(in.readByte());
       int thriftLength = in.readInt();
 
       table = new AcidTable(databaseName, tableName, createPartitions, tableType);
-      table.setTransactionId(transactionId);
+      table.setWriteId(writeId);
 
       Table metaTable = null;
       if (thriftLength > 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
index 645274e..8ba6cf6 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
 import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
 import org.apache.thrift.TException;
@@ -94,8 +95,22 @@ public class MutatorClient implements Closeable {
       throw new TransactionException("Not connected - cannot create transaction.");
     }
     Transaction transaction = new Transaction(metaStoreClient, lockOptions);
+    long txnId = transaction.getTransactionId();
     for (AcidTable table : tables) {
-      table.setTransactionId(transaction.getTransactionId());
+      try {
+        table.setWriteId(metaStoreClient.allocateTableWriteId(txnId,
+                table.getDatabaseName(), table.getTableName()));
+      } catch (TException ex) {
+        try {
+          metaStoreClient.rollbackTxn(txnId);
+        } catch (TException e) {
+          LOG.warn("Allocation of write id failed for table {} and rollback transaction {} failed due to {}",
+                  AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName()), txnId, e.getMessage());
+        }
+        throw new TransactionException("Unable to allocate table write ID for table "
+                + AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName())
+                + " under txn " + txnId, ex);
+      }
     }
     LOG.debug("Created transaction {}", transaction);
     return transaction;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
index ae23153..5e804d7 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -98,11 +98,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -120,11 +120,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -142,11 +142,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -229,9 +229,9 @@ public class MutatorCoordinator implements Closeable, Flushable {
     sequenceValidator.reset();
     if (deleteDeltaIfExists) {
       // TODO: Should this be the concern of the mutator?
-      deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId);
+      deleteDeltaIfExists(newPartitionPath, table.getWriteId(), newBucketId);
     }
-    mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId);
+    mutator = mutatorFactory.newMutator(outputFormat, table.getWriteId(), newPartitionPath, newBucketId);
     bucketId = newBucketId;
     partitionValues = newPartitionValues;
     partitionPath = newPartitionPath;
@@ -282,12 +282,12 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /* A delta may be present from a previous failed task attempt. */
-  private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
+  private void deleteDeltaIfExists(Path partitionPath, long writeId, int bucketId) throws IOException {
     Path deltaPath = AcidUtils.createFilename(partitionPath,
         new AcidOutputFormat.Options(configuration)
             .bucket(bucketId)
-            .minimumTransactionId(transactionId)
-            .maximumTransactionId(transactionId));
+            .minimumWriteId(writeId)
+            .maximumWriteId(writeId));
     FileSystem fileSystem = deltaPath.getFileSystem(configuration);
     if (fileSystem.exists(deltaPath)) {
       LOG.info("Deleting existing delta path: {}", deltaPath);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
index 22cd9b7..da7558f 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 
 public interface MutatorFactory {
 
-  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException;
+  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId)
+          throws IOException;
   
   RecordInspector newRecordInspector();
   

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
index 05cf8b7..84c477f 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 /** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */
 public class MutatorImpl implements Mutator {
 
-  private final long transactionId;
+  private final long writeId;
   private final Path partitionPath;
   private final int bucketProperty;
   private final Configuration configuration;
@@ -44,11 +44,11 @@ public class MutatorImpl implements Mutator {
    * @throws IOException
    */
   public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector,
-      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketProperty) throws IOException {
+      AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketProperty) throws IOException {
     this.configuration = configuration;
     this.recordIdColumn = recordIdColumn;
     this.objectInspector = objectInspector;
-    this.transactionId = transactionId;
+    this.writeId = writeId;
     this.partitionPath = partitionPath;
     this.bucketProperty = bucketProperty;
 
@@ -57,17 +57,17 @@ public class MutatorImpl implements Mutator {
 
   @Override
   public void insert(Object record) throws IOException {
-    updater.insert(transactionId, record);
+    updater.insert(writeId, record);
   }
 
   @Override
   public void update(Object record) throws IOException {
-    updater.update(transactionId, record);
+    updater.update(writeId, record);
   }
 
   @Override
   public void delete(Object record) throws IOException {
-    updater.delete(transactionId, record);
+    updater.delete(writeId, record);
   }
 
   /**
@@ -89,7 +89,7 @@ public class MutatorImpl implements Mutator {
 
   @Override
   public String toString() {
-    return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath
+    return "ObjectInspectorMutator [writeId=" + writeId + ", partitionPath=" + partitionPath
         + ", bucketId=" + bucketProperty + "]";
   }
 
@@ -101,8 +101,8 @@ public class MutatorImpl implements Mutator {
         new AcidOutputFormat.Options(configuration)
             .inspector(objectInspector)
             .bucket(bucketId)
-            .minimumTransactionId(transactionId)
-            .maximumTransactionId(transactionId)
+            .minimumWriteId(writeId)
+            .maximumWriteId(writeId)
             .recordIdColumn(recordIdColumn)
             .finalDestination(partitionPath)
             .statementId(-1));

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
index 5cd8081..320b987 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
@@ -29,22 +29,22 @@ class SequenceValidator {
 
   private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class);
 
-  private Long lastTxId;
+  private Long lastWriteId;
   private Long lastRowId;
 
   SequenceValidator() {
   }
 
   boolean isInSequence(RecordIdentifier recordIdentifier) {
-    if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) {
-      LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier);
+    if (lastWriteId != null && recordIdentifier.getWriteId() < lastWriteId) {
+      LOG.debug("Non-sequential write ID. Expected >{}, recordIdentifier={}", lastWriteId, recordIdentifier);
       return false;
-    } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null
+    } else if (lastWriteId != null && recordIdentifier.getWriteId() == lastWriteId && lastRowId != null
         && recordIdentifier.getRowId() <= lastRowId) {
       LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier);
       return false;
     }
-    lastTxId = recordIdentifier.getTransactionId();
+    lastWriteId = recordIdentifier.getWriteId();
     lastRowId = recordIdentifier.getRowId();
     return true;
   }
@@ -53,14 +53,14 @@ class SequenceValidator {
    * Validator must be reset for each new partition and or bucket.
    */
   void reset() {
-    lastTxId = null;
+    lastWriteId = null;
     lastRowId = null;
     LOG.debug("reset");
   }
 
   @Override
   public String toString() {
-    return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]";
+    return "SequenceValidator [lastWriteId=" + lastWriteId + ", lastRowId=" + lastRowId + "]";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index da2ca72..b042049 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -374,16 +374,16 @@ public class TestStreaming {
     Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
-    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
 
     queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
     queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
@@ -398,14 +398,14 @@ public class TestStreaming {
     runWorker(conf);
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
   }
 
   /**
@@ -540,8 +540,8 @@ public class TestStreaming {
   @Deprecated
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -555,11 +555,11 @@ public class TestStreaming {
     long min = Long.MAX_VALUE;
     long max = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta pd : current) {
-      if (pd.getMaxTransaction() > max) {
-        max = pd.getMaxTransaction();
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
       }
-      if (pd.getMinTransaction() < min) {
-        min = pd.getMinTransaction();
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
       }
     }
     Assert.assertEquals(minTxn, min);
@@ -573,7 +573,7 @@ public class TestStreaming {
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
@@ -593,7 +593,7 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
                                 String validationQuery, boolean vectorize, String... records) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
+    ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
@@ -608,11 +608,11 @@ public class TestStreaming {
     long min = Long.MAX_VALUE;
     long max = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta pd : current) {
-      if (pd.getMaxTransaction() > max) {
-        max = pd.getMaxTransaction();
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
       }
-      if (pd.getMinTransaction() < min) {
-        min = pd.getMinTransaction();
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
       }
     }
     Assert.assertEquals(minTxn, min);
@@ -637,8 +637,8 @@ public class TestStreaming {
   }
 
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -877,7 +877,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -889,11 +889,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -945,7 +945,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -957,11 +957,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1007,7 +1007,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -1134,7 +1134,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1153,13 +1153,13 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
     String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
-    checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
+    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 24,  1, validationQuery, true, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 10,  1, validationQuery, true, "1\tHello streaming",
             "2\tWelcome to streaming");
 
     txnBatch.close();
@@ -1170,14 +1170,14 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, false, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, false, "1\tHello streaming",
             "2\tWelcome to streaming", "3\tHello streaming - once again");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, true, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, true, "1\tHello streaming",
             "2\tWelcome to streaming", "3\tHello streaming - once again",
             "4\tWelcome to streaming - once again");
 
@@ -1214,14 +1214,15 @@ public class TestStreaming {
     txnBatch2.commit();
 
     String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
-    checkDataWritten2(partLoc, 24, 33, 1,
+    checkDataWritten2(partLoc, 11, 20, 1,
       validationQuery, true, "3\tHello streaming - once again");
 
     txnBatch1.commit();
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
+            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1234,7 +1235,7 @@ public class TestStreaming {
         Assert.assertTrue("", logicalLength == actualLength);
       }
     }
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.beginNextTransaction();
@@ -1246,7 +1247,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1259,19 +1260,19 @@ public class TestStreaming {
         Assert.assertTrue("", logicalLength <= actualLength);
       }
     }
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.commit();
 
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, false, "1\tHello streaming",
         "2\tWelcome to streaming",
         "3\tHello streaming - once again");
 
     txnBatch2.commit();
 
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, true, "1\tHello streaming",
         "2\tWelcome to streaming",
         "3\tHello streaming - once again",
@@ -2281,8 +2282,8 @@ public class TestStreaming {
       this.delegate = delegate;
     }
     @Override
-    public void write(long transactionId, byte[] record) throws StreamingException {
-      delegate.write(transactionId, record);
+    public void write(long writeId, byte[] record) throws StreamingException {
+      delegate.write(writeId, record);
       produceFault();
     }
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
index e057da7..c05ddcf 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
@@ -49,9 +49,9 @@ public class ReflectiveMutatorFactory implements MutatorFactory {
   }
 
   @Override
-  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId)
+  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId)
     throws IOException {
-    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath,
+    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, writeId, partitionPath,
         bucketId);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 873cddf..2aa8674 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -71,7 +71,7 @@ public class StreamingAssert {
   private List<String> partition;
   private IMetaStoreClient metaStoreClient;
   private Directory dir;
-  private ValidTxnList txns;
+  private ValidWriteIdList writeIds;
   private List<AcidUtils.ParsedDelta> currentDeltas;
   private long min;
   private long max;
@@ -83,9 +83,9 @@ public class StreamingAssert {
     this.table = table;
     this.partition = partition;
 
-    txns = metaStoreClient.getValidTxns();
+    writeIds = metaStoreClient.getValidWriteIds(AcidUtils.getFullTableName(table.getDbName(), table.getTableName()));
     partitionLocation = getPartitionLocation();
-    dir = AcidUtils.getAcidState(partitionLocation, conf, txns);
+    dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds);
     assertEquals(0, dir.getObsolete().size());
     assertEquals(0, dir.getOriginalFiles().size());
 
@@ -95,8 +95,8 @@ public class StreamingAssert {
     System.out.println("Files found: ");
     for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) {
       System.out.println(parsedDelta.getPath().toString());
-      max = Math.max(parsedDelta.getMaxTransaction(), max);
-      min = Math.min(parsedDelta.getMinTransaction(), min);
+      max = Math.max(parsedDelta.getMaxWriteId(), max);
+      min = Math.min(parsedDelta.getMinWriteId(), min);
     }
   }
 
@@ -145,7 +145,7 @@ public class StreamingAssert {
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
     assertEquals(numSplitsExpected, splits.length);
 
@@ -160,7 +160,7 @@ public class StreamingAssert {
 
       while (recordReader.next(key, value)) {
         RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
-        Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+        Record record = new Record(new RecordIdentifier(recordIdentifier.getWriteId(),
           recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
         System.out.println(record);
         records.add(record);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
index 7876e8d..1523a10 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
@@ -45,7 +45,7 @@ public class TestAcidTableSerializer {
 
     AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK);
     acidTable.setTable(table);
-    acidTable.setTransactionId(42L);
+    acidTable.setWriteId(42L);
 
     String encoded = AcidTableSerializer.encode(acidTable);
     System.out.println(encoded);
@@ -57,7 +57,7 @@ public class TestAcidTableSerializer {
     assertThat(decoded.getOutputFormatName(), is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"));
     assertThat(decoded.getTotalBuckets(), is(10));
     assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
-    assertThat(decoded.getTransactionId(), is(42L));
+    assertThat(decoded.getWriteId(), is(42L));
     assertThat(decoded.getTableType(), is(TableType.SINK));
     assertThat(decoded.getTable(), is(table));
   }
@@ -75,7 +75,7 @@ public class TestAcidTableSerializer {
     assertThat(decoded.getOutputFormatName(), is(nullValue()));
     assertThat(decoded.getTotalBuckets(), is(0));
     assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
-    assertThat(decoded.getTransactionId(), is(0L));
+    assertThat(decoded.getWriteId(), is(0L));
     assertThat(decoded.getTableType(), is(TableType.SINK));
     assertThat(decoded.getTable(), is(nullValue()));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
index cfe3a96..91b90ed 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
@@ -48,6 +48,8 @@ import org.mockito.runners.MockitoJUnitRunner;
 public class TestMutatorClient {
 
   private static final long TRANSACTION_ID = 42L;
+  private static final long WRITE_ID1 = 78L;
+  private static final long WRITE_ID2 = 33L;
   private static final String TABLE_NAME_1 = "TABLE_1";
   private static final String TABLE_NAME_2 = "TABLE_2";
   private static final String DB_NAME = "DB_1";
@@ -89,6 +91,8 @@ public class TestMutatorClient {
     when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString());
 
     when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+    when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_1)).thenReturn(WRITE_ID1);
+    when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_2)).thenReturn(WRITE_ID2);
 
     client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER,
         Collections.singletonList(TABLE_1));
@@ -110,13 +114,13 @@ public class TestMutatorClient {
     assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1));
     assertThat(outTables.get(0).getTotalBuckets(), is(2));
     assertThat(outTables.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
-    assertThat(outTables.get(0).getTransactionId(), is(0L));
+    assertThat(outTables.get(0).getWriteId(), is(0L));
     assertThat(outTables.get(0).getTable(), is(mockTable1));
     assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME));
     assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2));
     assertThat(outTables.get(1).getTotalBuckets(), is(2));
     assertThat(outTables.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
-    assertThat(outTables.get(1).getTransactionId(), is(0L));
+    assertThat(outTables.get(1).getWriteId(), is(0L));
     assertThat(outTables.get(1).getTable(), is(mockTable2));
   }
 
@@ -179,8 +183,8 @@ public class TestMutatorClient {
 
     assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
     assertThat(transaction.getState(), is(TxnState.INACTIVE));
-    assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID));
-    assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID));
+    assertThat(outTables.get(0).getWriteId(), is(WRITE_ID1));
+    assertThat(outTables.get(1).getWriteId(), is(WRITE_ID2));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
index d897477..fab56b3 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -49,7 +49,7 @@ public class TestMutatorCoordinator {
   private static final List<String> UNPARTITIONED = Collections.<String> emptyList();
   private static final List<String> PARTITION_B = Arrays.asList("B");
   private static final List<String> PARTITION_A = Arrays.asList("A");
-  private static final long TRANSACTION_ID = 2L;
+  private static final long WRITE_ID = 2L;
   private static final int BUCKET_ID = 0;
   private static final Path PATH_A = new Path("X");
   private static final Path PATH_B = new Path("B");
@@ -84,7 +84,7 @@ public class TestMutatorCoordinator {
   public void createCoordinator() throws Exception {
     when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName());
     when(mockAcidTable.getTotalBuckets()).thenReturn(1);
-    when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID);
+    when(mockAcidTable.getWriteId()).thenReturn(WRITE_ID);
     when(mockAcidTable.createPartitions()).thenReturn(true);
     when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector);
     when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver);
@@ -104,7 +104,7 @@ public class TestMutatorCoordinator {
     coordinator.insert(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).insert(RECORD);
   }
 
@@ -115,7 +115,7 @@ public class TestMutatorCoordinator {
     coordinator.insert(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator, times(3)).insert(RECORD);
   }
 
@@ -129,8 +129,8 @@ public class TestMutatorCoordinator {
 
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID));
     verify(mockMutator, times(2)).insert(RECORD);
   }
 
@@ -143,9 +143,9 @@ public class TestMutatorCoordinator {
     coordinator.update(UNPARTITIONED, RECORD);
     coordinator.delete(UNPARTITIONED, RECORD);
 
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutatorFactory)
-        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
+        .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID + 1));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
@@ -166,11 +166,11 @@ public class TestMutatorCoordinator {
     coordinator.update(PARTITION_B, RECORD); /* PbB1 */
 
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
-    verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B),
         eq(BUCKET_ID));
     verify(mockMutatorFactory)
-        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1));
+        .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID + 1));
     verify(mockMutator, times(2)).update(RECORD);
     verify(mockMutator).delete(RECORD);
     verify(mockMutator).insert(RECORD);
@@ -197,7 +197,7 @@ public class TestMutatorCoordinator {
     coordinator.delete(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
@@ -210,7 +210,7 @@ public class TestMutatorCoordinator {
     coordinator.delete(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }