You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/02/23 16:31:00 UTC

[02/21] hive git commit: HIVE-18192: Introduce WriteID per table rather than using global transaction ID (Sankar Hariappan, reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index a90b7d4..ba006cf 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -286,8 +286,9 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-          "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
+        String s = "select cq_id, cq_database, cq_table, cq_partition, "
+                + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '"
+                + READY_FOR_CLEANING + "'";
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         while (rs.next()) {
@@ -302,7 +303,7 @@ class CompactionTxnHandler extends TxnHandler {
             default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
           }
           info.runAs = rs.getString(6);
-          info.highestTxnId = rs.getLong(7);
+          info.highestWriteId = rs.getLong(7);
           rc.add(info);
         }
         LOG.debug("Going to rollback");
@@ -338,7 +339,7 @@ class CompactionTxnHandler extends TxnHandler {
       ResultSet rs = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
         pStmt.setLong(1, info.id);
         rs = pStmt.executeQuery();
         if(rs.next()) {
@@ -358,21 +359,21 @@ class CompactionTxnHandler extends TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
-        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
         info.state = SUCCEEDED_STATE;
         CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
         updCount = pStmt.executeUpdate();
 
         // Remove entries from completed_txn_components as well, so we don't start looking there
-        // again but only up to the highest txn ID include in this compaction job.
-        //highestTxnId will be NULL in upgrade scenarios
+        // again but only up to the highest write ID include in this compaction job.
+        //highestWriteId will be NULL in upgrade scenarios
         s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
             "ctc_table = ?";
         if (info.partName != null) {
           s += " and ctc_partition = ?";
         }
-        if(info.highestTxnId != 0) {
-          s += " and ctc_txnid <= ?";
+        if(info.highestWriteId != 0) {
+          s += " and ctc_writeid <= ?";
         }
         pStmt = dbConn.prepareStatement(s);
         int paramCount = 1;
@@ -381,8 +382,8 @@ class CompactionTxnHandler extends TxnHandler {
         if (info.partName != null) {
           pStmt.setString(paramCount++, info.partName);
         }
-        if(info.highestTxnId != 0) {
-          pStmt.setLong(paramCount++, info.highestTxnId);
+        if(info.highestWriteId != 0) {
+          pStmt.setLong(paramCount++, info.highestWriteId);
         }
         LOG.debug("Going to execute update <" + s + ">");
         if (pStmt.executeUpdate() < 1) {
@@ -392,15 +393,15 @@ class CompactionTxnHandler extends TxnHandler {
 
         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
           TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
-        if (info.highestTxnId != 0) s += " and txn_id <= ?";
+        if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
         if (info.partName != null) s += " and tc_partition = ?";
 
         pStmt = dbConn.prepareStatement(s);
         paramCount = 1;
         pStmt.setString(paramCount++, info.dbname);
         pStmt.setString(paramCount++, info.tableName);
-        if(info.highestTxnId != 0) {
-          pStmt.setLong(paramCount++, info.highestTxnId);
+        if(info.highestWriteId != 0) {
+          pStmt.setLong(paramCount++, info.highestWriteId);
         }
         if (info.partName != null) {
           pStmt.setString(paramCount++, info.partName);
@@ -700,14 +701,14 @@ class CompactionTxnHandler extends TxnHandler {
    */
   @Override
   @RetrySemantics.Idempotent
-  public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
+  public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId +
+        int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId +
           " WHERE CQ_ID = " + ci.id);
         if(updCount != 1) {
           throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci);
@@ -715,14 +716,14 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn.commit();
       } catch (SQLException e) {
         rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")");
+        checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")");
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
         close(null, stmt, dbConn);
       }
     } catch (RetryException ex) {
-      setCompactionHighestTxnId(ci, highestTxnId);
+      setCompactionHighestWriteId(ci, highestWriteId);
     }
   }
   private static class RetentionCounters {
@@ -932,7 +933,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
         pStmt.setLong(1, ci.id);
         rs = pStmt.executeQuery();
         if(rs.next()) {
@@ -966,7 +967,7 @@ class CompactionTxnHandler extends TxnHandler {
         close(rs, stmt, null);
         closeStmt(pStmt);
 
-        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
         int updCount = pStmt.executeUpdate();
         LOG.debug("Going to commit");

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index e724723..88f6346 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -86,16 +86,29 @@ public final class TxnDbUtil {
           "  TC_DATABASE varchar(128) NOT NULL," +
           "  TC_TABLE varchar(128)," +
           "  TC_PARTITION varchar(767)," +
-          "  TC_OPERATION_TYPE char(1) NOT NULL)");
+          "  TC_OPERATION_TYPE char(1) NOT NULL," +
+          "  TC_WRITEID bigint)");
       stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
           "  CTC_TXNID bigint," +
           "  CTC_DATABASE varchar(128) NOT NULL," +
           "  CTC_TABLE varchar(128)," +
           "  CTC_PARTITION varchar(767)," +
           "  CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," +
-          "  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)");
+          "  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," +
+          "  CTC_WRITEID bigint)");
       stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+
+      stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" +
+          " T2W_TXNID bigint NOT NULL," +
+          " T2W_DATABASE varchar(128) NOT NULL," +
+          " T2W_TABLE varchar(256) NOT NULL," +
+          " T2W_WRITEID bigint NOT NULL)");
+      stmt.execute("CREATE TABLE NEXT_WRITE_ID (" +
+          " NWI_DATABASE varchar(128) NOT NULL," +
+          " NWI_TABLE varchar(256) NOT NULL," +
+          " NWI_NEXT bigint NOT NULL)");
+
       stmt.execute("CREATE TABLE HIVE_LOCKS (" +
           " HL_LOCK_EXT_ID bigint NOT NULL," +
           " HL_LOCK_INT_ID bigint NOT NULL," +
@@ -130,7 +143,7 @@ public final class TxnDbUtil {
           " CQ_WORKER_ID varchar(128)," +
           " CQ_START bigint," +
           " CQ_RUN_AS varchar(128)," +
-          " CQ_HIGHEST_TXN_ID bigint," +
+          " CQ_HIGHEST_WRITE_ID bigint," +
           " CQ_META_INFO varchar(2048) for bit data," +
           " CQ_HADOOP_JOB_ID varchar(32))");
 
@@ -138,20 +151,20 @@ public final class TxnDbUtil {
       stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
       
       stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
-        " CC_ID bigint PRIMARY KEY," +
-        " CC_DATABASE varchar(128) NOT NULL," +
-        " CC_TABLE varchar(128) NOT NULL," +
-        " CC_PARTITION varchar(767)," +
-        " CC_STATE char(1) NOT NULL," +
-        " CC_TYPE char(1) NOT NULL," +
-        " CC_TBLPROPERTIES varchar(2048)," +
-        " CC_WORKER_ID varchar(128)," +
-        " CC_START bigint," +
-        " CC_END bigint," +
-        " CC_RUN_AS varchar(128)," +
-        " CC_HIGHEST_TXN_ID bigint," +
-        " CC_META_INFO varchar(2048) for bit data," +
-        " CC_HADOOP_JOB_ID varchar(32))");
+          " CC_ID bigint PRIMARY KEY," +
+          " CC_DATABASE varchar(128) NOT NULL," +
+          " CC_TABLE varchar(128) NOT NULL," +
+          " CC_PARTITION varchar(767)," +
+          " CC_STATE char(1) NOT NULL," +
+          " CC_TYPE char(1) NOT NULL," +
+          " CC_TBLPROPERTIES varchar(2048)," +
+          " CC_WORKER_ID varchar(128)," +
+          " CC_START bigint," +
+          " CC_END bigint," +
+          " CC_RUN_AS varchar(128)," +
+          " CC_HIGHEST_WRITE_ID bigint," +
+          " CC_META_INFO varchar(2048) for bit data," +
+          " CC_HADOOP_JOB_ID varchar(32))");
       
       stmt.execute("CREATE TABLE AUX_TABLE (" +
         " MT_KEY1 varchar(128) NOT NULL," +
@@ -219,6 +232,8 @@ public final class TxnDbUtil {
         success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
         success &= dropTable(stmt, "TXNS", retryCount);
         success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+        success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount);
+        success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount);
         success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
         success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
         success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 1bb976c..ac61715 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -61,6 +61,7 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
@@ -69,6 +70,8 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
 import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
@@ -80,6 +83,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
 import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
 import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
@@ -102,10 +107,12 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -819,8 +826,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // Move the record from txn_components into completed_txn_components so that the compactor
         // knows where to look to compact.
         String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
-          "ctc_table, ctc_partition) select tc_txnid, tc_database, tc_table, " +
-          "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
+            "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
+            "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute insert <" + s + ">");
         int modCount = 0;
         if ((modCount = stmt.executeUpdate(s)) < 1) {
@@ -869,6 +876,244 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   @Override
+  @RetrySemantics.ReadOnly
+  public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+          throws NoSuchTxnException, MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      ValidTxnList validTxnList;
+
+      // We should prepare the valid write ids list based on validTxnList of current txn.
+      // If no txn exists in the caller, then they would pass null for validTxnList and so it is
+      // required to get the current state of txns to make validTxnList
+      if (rqst.isSetValidTxnList()) {
+        validTxnList = new ValidReadTxnList(rqst.getValidTxnList());
+      } else {
+        // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn
+        validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0);
+      }
+      try {
+        /**
+         * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+         */
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        // Get the valid write id list for all the tables read by the current txn
+        List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
+        for (String fullTableName : rqst.getFullTableNames()) {
+          tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList));
+        }
+
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList);
+        return owr;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getValidWriteIds");
+        throw new MetaException("Unable to select from transaction database, "
+                + StringUtils.stringifyException(e));
+      } finally {
+        close(null, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return getValidWriteIds(rqst);
+    }
+  }
+
+  // Method to get the Valid write ids list for the given table
+  // Input fullTableName is expected to be of format <db_name>.<table_name>
+  private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName,
+                                               ValidTxnList validTxnList) throws SQLException {
+    ResultSet rs = null;
+    String[] names = TxnUtils.getDbTableName(fullTableName);
+    try {
+      // Need to initialize to 0 to make sure if nobody modified this table, then current txn
+      // shouldn't read any data
+      long writeIdHwm = 0;
+      List<Long> invalidWriteIdList = new ArrayList<>();
+      long txnHwm = validTxnList.getHighWatermark();
+
+      // The output includes all the txns which are under the high water mark. It includes
+      // the committed transactions as well. The results should be sorted in ascending order based
+      // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up
+      // using binary search.
+      String s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm
+              + " and t2w_database = " + quoteString(names[0])
+              + " and t2w_table = " + quoteString(names[1])
+              + " order by t2w_writeid asc";
+      LOG.debug("Going to execute query<" + s + ">");
+      rs = stmt.executeQuery(s);
+      long minOpenWriteId = Long.MAX_VALUE;
+      BitSet abortedBits = new BitSet();
+      while (rs.next()) {
+        long txnId = rs.getLong(1);
+        long writeId = rs.getLong(2);
+        writeIdHwm = Math.max(writeIdHwm, writeId);
+        if (validTxnList.isTxnValid(txnId)) {
+          // Skip if the transaction under evaluation is already committed.
+          continue;
+        }
+
+        // The current txn is either in open or aborted state.
+        // Mark the write ids state as per the txn state.
+        if (validTxnList.isTxnAborted(txnId)) {
+          invalidWriteIdList.add(writeId);
+          abortedBits.set(invalidWriteIdList.size() - 1);
+        } else {
+          invalidWriteIdList.add(writeId);
+          minOpenWriteId = Math.min(minOpenWriteId, writeId);
+        }
+      }
+
+      ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+      TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer);
+      if (minOpenWriteId < Long.MAX_VALUE) {
+        owi.setMinOpenWriteId(minOpenWriteId);
+      }
+      return owi;
+    } finally {
+      close(rs);
+    }
+  }
+
+  @Override
+  public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+          throws NoSuchTxnException, TxnAbortedException, MetaException {
+    List<Long> txnIds = rqst.getTxnIds();
+    String dbName = rqst.getDbName().toLowerCase();
+    String tblName = rqst.getTableName().toLowerCase();
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      ResultSet rs = null;
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        Collections.sort(txnIds); //easier to read logs
+
+        // Check if all the input txns are in open state. Write ID should be allocated only for open transactions.
+        if (!isTxnsInOpenState(txnIds, stmt)) {
+          ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
+          throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+        }
+
+        List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
+        List<Long> allocatedTxns = new ArrayList<>();
+        long txnId;
+        long writeId;
+        List<String> queries = new ArrayList<>();
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+
+        // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
+        // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
+        // The write id would have been already allocated in case of multi-statement txns where
+        // first write on a table will allocate write id and rest of the writes should re-use it.
+        prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
+                        + " t2w_database = " + quoteString(dbName)
+                        + " and t2w_table = " + quoteString(tblName) + " and ");
+        suffix.append("");
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+                txnIds, "t2w_txnid", false, false);
+        for (String query : queries) {
+          LOG.debug("Going to execute query <" + query + ">");
+          rs = stmt.executeQuery(query);
+          while (rs.next()) {
+            // If table write ID is already allocated for the given transaction, then just use it
+            txnId = rs.getLong(1);
+            writeId = rs.getLong(2);
+            txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+            allocatedTxns.add(txnId);
+            LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
+          }
+        }
+
+        // If all the txns in the list have already allocated write ids, then just skip new allocations
+        long numOfWriteIds = txnIds.size() - allocatedTxns.size();
+        assert(numOfWriteIds >= 0);
+        if (0 == numOfWriteIds) {
+          // If all the txns in the list have pre-allocated write ids for the given table, then just return
+          return new AllocateTableWriteIdsResponse(txnToWriteIds);
+        }
+
+        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+
+        // There are some txns in the list which has no write id allocated and hence go ahead and do it.
+        // Get the next write id for the given table and update it with new next write id.
+        // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
+        String s = sqlGenerator.addForUpdateClause(
+                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+                        + " and nwi_table = " + quoteString(tblName));
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          // First allocation of write id should add the table to the next_write_id meta table
+          // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
+          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+                  + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")";
+          LOG.debug("Going to execute insert <" + s + ">");
+          stmt.execute(s);
+          writeId = 1;
+        } else {
+          // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
+          writeId = rs.getLong(1);
+          s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
+                  + " where nwi_database = " + quoteString(dbName)
+                  + " and nwi_table = " + quoteString(tblName);
+          LOG.debug("Going to execute update <" + s + ">");
+          stmt.executeUpdate(s);
+        }
+
+        // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
+        // write ids
+        List<String> rows = new ArrayList<>();
+        for (long txn : txnIds) {
+          if (allocatedTxns.contains(txn)) {
+            continue;
+          }
+          rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+          txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+          LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+          writeId++;
+        }
+
+        // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
+        List<String> inserts = sqlGenerator.createInsertValuesStmt(
+                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
+        for (String insert : inserts) {
+          LOG.debug("Going to execute insert <" + insert + ">");
+          stmt.execute(insert);
+        }
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return new AllocateTableWriteIdsResponse(txnToWriteIds);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
+        throw new MetaException("Unable to update transaction database "
+                + StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return allocateTableWriteIds(rqst);
+    }
+  }
+
+  @Override
   @RetrySemantics.SafeToRetry
   public void performWriteSetGC() {
     Connection dbConn = null;
@@ -1122,13 +1367,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             String dbName = lc.getDbname();
             String tblName = lc.getTablename();
             String partName = lc.getPartitionname();
+            Long writeId = null;
+            if (tblName != null) {
+              // It is assumed the caller have already allocated write id for adding/updating data to
+              // the acid tables. However, DDL operatons won't allocate write id and hence this query
+              // may return empty result sets.
+              // Get the write id allocated by this txn for the given table writes
+              s = "select t2w_writeid from TXN_TO_WRITE_ID where"
+                      + " t2w_database = " + quoteString(dbName.toLowerCase())
+                      + " and t2w_table = " + quoteString(tblName.toLowerCase())
+                      + " and t2w_txnid = " + txnid;
+              LOG.debug("Going to execute query <" + s + ">");
+              rs = stmt.executeQuery(s);
+              if (rs.next()) {
+                writeId = rs.getLong(1);
+              }
+            }
             rows.add(txnid + ", '" + dbName + "', " +
-              (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-              (partName == null ? "null" : "'" + partName + "'")+ "," +
-              quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString()));
+                    (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+                    (partName == null ? "null" : "'" + partName + "'")+ "," +
+                    quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
+                    (writeId == null ? "null" : writeId));
           }
           List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
+              "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
           for(String query : queries) {
             LOG.debug("Going to execute update <" + query + ">");
             int modCount = stmt.executeUpdate(query);
@@ -1810,7 +2072,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet lockHandle = null;
-    ResultSet rs = null;
     try {
       try {
         lockInternal();
@@ -1827,15 +2088,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if(rqst.isSetOperationType()) {
           ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
         }
+
+        Long writeId = rqst.getWriteid();
         List<String> rows = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
           rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
-            "," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
+              "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
         }
         int modCount = 0;
         //record partitions that were written to
         List<String> queries = sqlGenerator.createInsertValuesStmt(
-          "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
+            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
         for(String query : queries) {
           LOG.debug("Going to execute update <" + query + ">");
           modCount = stmt.executeUpdate(query);
@@ -1880,7 +2143,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         StringBuilder buff = new StringBuilder();
 
         switch (type) {
-          case DATABASE:
+          case DATABASE: {
             dbName = db.getName();
 
             buff.append("delete from TXN_COMPONENTS where tc_database='");
@@ -1906,8 +2169,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             buff.append("'");
             queries.add(buff.toString());
 
+            buff.setLength(0);
+            buff.append("delete from TXN_TO_WRITE_ID where t2w_database='");
+            buff.append(dbName.toLowerCase());
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from NEXT_WRITE_ID where nwi_database='");
+            buff.append(dbName.toLowerCase());
+            buff.append("'");
+            queries.add(buff.toString());
+
             break;
-          case TABLE:
+          }
+          case TABLE: {
             dbName = table.getDbName();
             tblName = table.getTableName();
 
@@ -1942,8 +2218,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             buff.append("'");
             queries.add(buff.toString());
 
+            buff.setLength(0);
+            buff.append("delete from TXN_TO_WRITE_ID where t2w_database='");
+            buff.append(dbName.toLowerCase());
+            buff.append("' and t2w_table='");
+            buff.append(tblName.toLowerCase());
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from NEXT_WRITE_ID where nwi_database='");
+            buff.append(dbName.toLowerCase());
+            buff.append("' and nwi_table='");
+            buff.append(tblName.toLowerCase());
+            buff.append("'");
+            queries.add(buff.toString());
+
             break;
-          case PARTITION:
+          }
+          case PARTITION: {
             dbName = table.getDbName();
             tblName = table.getTableName();
             List<FieldSchema> partCols = table.getPartitionKeys();  // partition columns
@@ -1996,8 +2289,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             }
 
             break;
-          default:
+          }
+          default: {
             throw new MetaException("Invalid object type for cleanup: " + type);
+          }
         }
 
         for (String query : queries) {
@@ -3003,6 +3298,115 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   /**
+   * Checks if all the txns in the list are in open state.
+   * @param txnIds list of txns to be evaluated for open state
+   * @param stmt db statement
+   * @return If all txns in open state, then return true else false
+   */
+  private boolean isTxnsInOpenState(List<Long> txnIds, Statement stmt) throws SQLException {
+    List<String> queries = new ArrayList<>();
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+
+    // Get the count of txns from the given list are in open state. If the returned count is same as
+    // the input number of txns, then it means, all are in open state.
+    prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and ");
+    suffix.append("");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            txnIds, "txn_id", false, false);
+
+    long count = 0;
+    for (String query : queries) {
+      LOG.debug("Going to execute query <" + query + ">");
+      ResultSet rs = stmt.executeQuery(query);
+      if (rs.next()) {
+        count += rs.getLong(1);
+      }
+    }
+    return count == txnIds.size();
+  }
+
+  /**
+   * Checks if all the txns in the list are in open state.
+   * @param dbName Database name
+   * @param tblName Table on which we try to allocate write id
+   * @param txnIds list of txns to be evaluated for open state
+   * @param stmt db statement
+   */
+  private void ensureAllTxnsValid(String dbName, String tblName, List<Long> txnIds, Statement stmt)
+          throws SQLException {
+    List<String> queries = new ArrayList<>();
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+
+    // Check if any of the txns in the list is aborted.
+    prefix.append("select txn_id, txn_state from TXNS where ");
+    suffix.append("");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            txnIds, "txn_id", false, false);
+    Long txnId;
+    char txnState;
+    boolean isAborted = false;
+    StringBuilder errorMsg = new StringBuilder();
+    errorMsg.append("Write ID allocation on ")
+            .append(Warehouse.getQualifiedName(dbName, tblName))
+            .append(" failed for input txns: ");
+    for (String query : queries) {
+      LOG.debug("Going to execute query <" + query + ">");
+      ResultSet rs = stmt.executeQuery(query);
+      while (rs.next()) {
+        txnId = rs.getLong(1);
+        txnState = rs.getString(2).charAt(0);
+        if (txnState != TXN_OPEN) {
+          isAborted = true;
+          errorMsg.append("{").append(txnId).append(",").append(txnState).append("}");
+        }
+      }
+    }
+    // Check if any of the txns in the list is committed.
+    boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg);
+    if (isAborted || isCommitted) {
+      LOG.error(errorMsg.toString());
+      throw new IllegalStateException("Write ID allocation failed on "
+              + Warehouse.getQualifiedName(dbName, tblName)
+              + " as not all input txns in open state");
+    }
+  }
+
+  /**
+   * Checks if all the txns in the list are in committed. If yes, throw eception.
+   * @param txnIds list of txns to be evaluated for committed
+   * @param stmt db statement
+   * @return true if any input txn is committed, else false
+   */
+  private boolean checkIfTxnsCommitted(List<Long> txnIds, Statement stmt, StringBuilder errorMsg)
+          throws SQLException {
+    List<String> queries = new ArrayList<>();
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+
+    // Check if any of the txns in the list is committed. If yes, throw exception.
+    prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where ");
+    suffix.append("");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            txnIds, "ctc_txnid", false, false);
+    Long txnId;
+    boolean isCommitted = false;
+    for (String query : queries) {
+      LOG.debug("Going to execute query <" + query + ">");
+      ResultSet rs = stmt.executeQuery(query);
+      while (rs.next()) {
+        isCommitted = true;
+        txnId = rs.getLong(1);
+        if (errorMsg != null) {
+          errorMsg.append("{").append(txnId).append(",c}");
+        }
+      }
+    }
+    return isCommitted;
+  }
+
+  /**
    * Used to raise an informative error when the caller expected a txn in a particular TxnStatus
    * but found it in some other status
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3e27034..38fa0e2 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -38,8 +38,10 @@ import java.util.Set;
 @InterfaceStability.Evolving
 public interface TxnStore extends Configurable {
 
-  enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
-    WriteSetCleaner, CompactionScheduler}
+  enum MUTEX_KEY {
+    Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
+    WriteSetCleaner, CompactionScheduler, WriteIdAllocator
+  }
   // Compactor states (Should really be enum)
   String INITIATED_RESPONSE = "initiated";
   String WORKING_RESPONSE = "working";
@@ -123,6 +125,25 @@ public interface TxnStore extends Configurable {
   public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
       String inputDbName, String inputTableName, ValidTxnList txnList)
           throws MetaException;
+  /**
+   * Gets the list of valid write ids for the given table wrt to current txn
+   * @param rqst info on transaction and list of table names associated with given transaction
+   * @throws NoSuchTxnException
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+          throws NoSuchTxnException,  MetaException;
+
+  /**
+   * Allocate a write ID for the given table and associate it with a transaction
+   * @param rqst info on transaction and table to allocate write id
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+    throws NoSuchTxnException, TxnAbortedException, MetaException;
 
   /**
    * Obtain a lock.
@@ -206,7 +227,7 @@ public interface TxnStore extends Configurable {
   CompactionResponse compact(CompactionRequest rqst) throws MetaException;
 
   /**
-   * Show list of current compactions
+   * Show list of current compactions.
    * @param rqst info on which compactions to show
    * @return compaction information
    * @throws MetaException
@@ -226,7 +247,7 @@ public interface TxnStore extends Configurable {
       throws NoSuchTxnException,  TxnAbortedException, MetaException;
 
   /**
-   * Clean up corresponding records in metastore tables
+   * Clean up corresponding records in metastore tables.
    * @param type Hive object type
    * @param db database object
    * @param table table object
@@ -350,10 +371,10 @@ public interface TxnStore extends Configurable {
   List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
 
   /**
-   * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+   * Record the highest write id that the {@code ci} compaction job will pay attention to.
    */
   @RetrySemantics.Idempotent
-  void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+  void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException;
 
   /**
    * For any given compactable entity (partition, table if not partitioned) the history of compactions

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 027fb3f..7b02865 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -18,15 +18,16 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -63,53 +64,94 @@ public class TxnUtils {
     BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
     long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
     int i = 0;
-    for(long txn: open) {
+    for (long txn : open) {
       if (currentTxn > 0 && currentTxn == txn) continue;
       exceptions[i++] = txn;
     }
-    if(txns.isSetMin_open_txn()) {
+    if (txns.isSetMin_open_txn()) {
       return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
-    }
-    else {
+    } else {
       return new ValidReadTxnList(exceptions, abortedBits, highWater);
     }
   }
 
   /**
-   * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
-   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
-   * compact the files, and thus treats only open transactions as invalid.  Additionally any
-   * txnId &gt; highestOpenTxnId is also invalid.  This is to avoid creating something like
-   * delta_17_120 where txnId 80, for example, is still open.
-   * @param txns txn list from the metastore
-   * @return a valid txn list.
+   * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}.  This assumes that the caller intends to
+   * read the files, and thus treats both open and aborted transactions as invalid.
+   * @param currentTxnId current txn ID for which we get the valid write ids list
+   * @param validWriteIds valid write ids list from the metastore
+   * @return a valid write IDs list for the whole transaction.
    */
-  public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
-    //highWater is the last txn id that has been allocated
-    long highWater = txns.getTxn_high_water_mark();
-    long minOpenTxn = Long.MAX_VALUE;
-    long[] exceptions = new long[txns.getOpen_txnsSize()];
+  public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
+                                                              GetValidWriteIdsResponse validWriteIds) {
+    ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
+    for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) {
+      validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
+    }
+    return validTxnWriteIdList;
+  }
+
+  /**
+   * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+   * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}.  This assumes that the caller intends to
+   * read the files, and thus treats both open and aborted write ids as invalid.
+   * @param tableWriteIds valid write ids for the given table from the metastore
+   * @return a valid write IDs list for the input table
+   */
+  public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) {
+    String fullTableName = tableWriteIds.getFullTableName();
+    long highWater = tableWriteIds.getWriteIdHighWaterMark();
+    List<Long> invalids = tableWriteIds.getInvalidWriteIds();
+    BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
+    long[] exceptions = new long[invalids.size()];
     int i = 0;
-    for (TxnInfo txn : txns.getOpen_txns()) {
-      if (txn.getState() == TxnState.OPEN) {
-        minOpenTxn = Math.min(minOpenTxn, txn.getId());
-      }
-      else {
-        //only need aborted since we don't consider anything above minOpenTxn
-        exceptions[i++] = txn.getId();
+    for (long writeId : invalids) {
+      exceptions[i++] = writeId;
+    }
+    if (tableWriteIds.isSetMinOpenWriteId()) {
+      return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater,
+                                        tableWriteIds.getMinOpenWriteId());
+    } else {
+      return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater);
+    }
+  }
+
+  /**
+   * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+   * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}.  This assumes that the caller intends to
+   * compact the files, and thus treats only open transactions/write ids as invalid.  Additionally any
+   * writeId &gt; highestOpenWriteId is also invalid.  This is to avoid creating something like
+   * delta_17_120 where writeId 80, for example, is still open.
+   * @param tableValidWriteIds table write id list from the metastore
+   * @return a valid write id list.
+   */
+  public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) {
+    String fullTableName = tableValidWriteIds.getFullTableName();
+    long highWater = tableValidWriteIds.getWriteIdHighWaterMark();
+    long minOpenWriteId = Long.MAX_VALUE;
+    List<Long> invalids = tableValidWriteIds.getInvalidWriteIds();
+    BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits());
+    long[] exceptions = new long[invalids.size()];
+    int i = 0;
+    for (long writeId : invalids) {
+      if (abortedBits.get(i)) {
+        // Only need aborted since we don't consider anything above minOpenWriteId
+        exceptions[i++] = writeId;
+      } else {
+        minOpenWriteId = Math.min(minOpenWriteId, writeId);
       }
     }
     if(i < exceptions.length) {
       exceptions = Arrays.copyOf(exceptions, i);
     }
-    highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+    highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1;
     BitSet bitSet = new BitSet(exceptions.length);
-    bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted
-    if(minOpenTxn == Long.MAX_VALUE) {
-      return new ValidCompactorTxnList(exceptions, bitSet, highWater);
-    }
-    else {
-      return new ValidCompactorTxnList(exceptions, bitSet, highWater, minOpenTxn);
+    bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted
+    if (minOpenWriteId == Long.MAX_VALUE) {
+      return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater);
+    } else {
+      return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId);
     }
   }
 
@@ -134,7 +176,7 @@ public class TxnUtils {
    * Note, users are responsible for using the correct TxnManager. We do not look at
    * SessionState.get().getTxnMgr().supportsAcid() here
    * Should produce the same result as
-   * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}
+   * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}.
    * @return true if table is a transactional table, false otherwise
    */
   public static boolean isTransactionalTable(Table table) {
@@ -148,7 +190,7 @@ public class TxnUtils {
 
   /**
    * Should produce the same result as
-   * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}
+   * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}.
    */
   public static boolean isAcidTable(Table table) {
     return TxnUtils.isTransactionalTable(table) &&
@@ -157,6 +199,19 @@ public class TxnUtils {
   }
 
   /**
+   * Should produce the result as <dbName>.<tableName>.
+   */
+  public static String getFullTableName(String dbName, String tableName) {
+    return dbName.toLowerCase() + "." + tableName.toLowerCase();
+  }
+
+  public static String[] getDbTableName(String fullTableName) {
+    return fullTableName.split("\\.");
+  }
+
+
+
+  /**
    * Build a query (or queries if one query is too big but only for the case of 'IN' 
    * composite clause. For the case of 'NOT IN' clauses, multiple queries change
    * the semantics of the intended query.
@@ -357,7 +412,7 @@ public class TxnUtils {
     return ret;
   }
 
-  /*
+  /**
    * Compute and return the size of a query statement with the given parameters as input variables.
    *
    * @param sizeSoFar     size of the current contents of the buf

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
index ac28869..9d8a703 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
@@ -422,7 +422,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
   TC_PARTITION varchar(767),
-  TC_OPERATION_TYPE char(1) NOT NULL
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID bigint
 );
 
 CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
@@ -432,7 +433,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
   CTC_DATABASE varchar(128) NOT NULL,
   CTC_TABLE varchar(256),
   CTC_PARTITION varchar(767),
-  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID bigint
 );
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -480,7 +482,7 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_WORKER_ID varchar(128),
   CQ_START bigint,
   CQ_RUN_AS varchar(128),
-  CQ_HIGHEST_TXN_ID bigint,
+  CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO varchar(2048) for bit data,
   CQ_HADOOP_JOB_ID varchar(32)
 );
@@ -502,7 +504,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_START bigint,
   CC_END bigint,
   CC_RUN_AS varchar(128),
-  CC_HIGHEST_TXN_ID bigint,
+  CC_HIGHEST_WRITE_ID bigint,
   CC_META_INFO varchar(2048) for bit data,
   CC_HADOOP_JOB_ID varchar(32)
 );
@@ -525,6 +527,23 @@ CREATE TABLE WRITE_SET (
   WS_OPERATION_TYPE char(1) NOT NULL
 );
 
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
index d49255a..a50c45d 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
@@ -94,3 +94,29 @@ UPDATE SDS
 UPDATE DBS
   SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4)
   WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- 050-HIVE-18192.derby.sql
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+RENAME COLUMN COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+RENAME COLUMN COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
index 7c26d5d..1b7d0da 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
@@ -969,7 +969,7 @@ CREATE TABLE COMPACTION_QUEUE(
 	CQ_WORKER_ID nvarchar(128) NULL,
 	CQ_START bigint NULL,
 	CQ_RUN_AS nvarchar(128) NULL,
-	CQ_HIGHEST_TXN_ID bigint NULL,
+    CQ_HIGHEST_WRITE_ID bigint NULL,
     CQ_META_INFO varbinary(2048) NULL,
 	CQ_HADOOP_JOB_ID nvarchar(128) NULL,
 PRIMARY KEY CLUSTERED 
@@ -990,7 +990,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 	CC_START bigint NULL,
 	CC_END bigint NULL,
 	CC_RUN_AS nvarchar(128) NULL,
-	CC_HIGHEST_TXN_ID bigint NULL,
+    CC_HIGHEST_WRITE_ID bigint NULL,
     CC_META_INFO varbinary(2048) NULL,
 	CC_HADOOP_JOB_ID nvarchar(128) NULL,
 PRIMARY KEY CLUSTERED 
@@ -1004,7 +1004,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS(
 	CTC_DATABASE nvarchar(128) NOT NULL,
 	CTC_TABLE nvarchar(128) NULL,
 	CTC_PARTITION nvarchar(767) NULL,
-  CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL
+    CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL,
+    CTC_WRITEID bigint
 );
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -1072,7 +1073,8 @@ CREATE TABLE TXN_COMPONENTS(
 	TC_DATABASE nvarchar(128) NOT NULL,
 	TC_TABLE nvarchar(128) NULL,
 	TC_PARTITION nvarchar(767) NULL,
-	TC_OPERATION_TYPE char(1) NOT NULL
+    TC_OPERATION_TYPE char(1) NOT NULL,
+    TC_WRITEID bigint
 );
 
 ALTER TABLE TXN_COMPONENTS  WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -1129,6 +1131,23 @@ CREATE TABLE METASTORE_DB_PROPERTIES (
 
 ALTER TABLE METASTORE_DB_PROPERTIES ADD CONSTRAINT PROPERTY_KEY_PK PRIMARY KEY (PROPERTY_KEY);
 
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE nvarchar(128) NOT NULL,
+  T2W_TABLE nvarchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE nvarchar(128) NOT NULL,
+  NWI_TABLE nvarchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
index 6dc3e1a..8ab466d 100644
--- a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
@@ -148,3 +148,29 @@ UPDATE SDS
 UPDATE DBS
   SET DB_LOCATION_URI = 's3a' + SUBSTRING(DB_LOCATION_URI, 4, LEN(DB_LOCATION_URI))
   WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE nvarchar(128) NOT NULL,
+  T2W_TABLE nvarchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE nvarchar(128) NOT NULL,
+  NWI_TABLE nvarchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+EXEC SP_RENAME 'COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID', 'CQ_HIGHEST_WRITE_ID', 'COLUMN';
+
+EXEC SP_RENAME 'COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID', 'CC_HIGHEST_WRITE_ID',  'COLUMN';
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
index 0eb2e2e..886c932 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
@@ -962,6 +962,7 @@ CREATE TABLE TXN_COMPONENTS (
   TC_TABLE varchar(128) NOT NULL,
   TC_PARTITION varchar(767),
   TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID bigint,
   FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
@@ -972,7 +973,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
   CTC_DATABASE varchar(128) NOT NULL,
   CTC_TABLE varchar(256),
   CTC_PARTITION varchar(767),
-  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID bigint
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE;
@@ -1021,7 +1023,7 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_WORKER_ID varchar(128),
   CQ_START bigint,
   CQ_RUN_AS varchar(128),
-  CQ_HIGHEST_TXN_ID bigint,
+  CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO varbinary(2048),
   CQ_HADOOP_JOB_ID varchar(32)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -1038,7 +1040,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_START bigint,
   CC_END bigint,
   CC_RUN_AS varchar(128),
-  CC_HIGHEST_TXN_ID bigint,
+  CC_HIGHEST_WRITE_ID bigint,
   CC_META_INFO varbinary(2048),
   CC_HADOOP_JOB_ID varchar(32)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -1063,6 +1065,24 @@ CREATE TABLE WRITE_SET (
   WS_COMMIT_ID bigint NOT NULL,
   WS_OPERATION_TYPE char(1) NOT NULL
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
index 0a170f6..a537734 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
@@ -133,3 +133,29 @@ UPDATE SDS
 UPDATE DBS
   SET DB_LOCATION_URI = CONCAT('s3a', SUBSTR(DB_LOCATION_URI, 4, LENGTH(DB_LOCATION_URI)))
   WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE CHANGE `CQ_HIGHEST_TXN_ID` `CQ_HIGHEST_WRITE_ID` bigint;
+
+ALTER TABLE COMPLETED_COMPACTIONS CHANGE `CC_HIGHEST_TXN_ID` `CC_HIGHEST_WRITE_ID` bigint;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
index 37f9063..366b2d9 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
@@ -936,7 +936,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_DATABASE VARCHAR2(128) NOT NULL,
   TC_TABLE VARCHAR2(256),
   TC_PARTITION VARCHAR2(767) NULL,
-  TC_OPERATION_TYPE char(1) NOT NULL
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID NUMBER(19)
 ) ROWDEPENDENCIES;
 
 CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
@@ -946,7 +947,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
   CTC_DATABASE VARCHAR2(128) NOT NULL,
   CTC_TABLE VARCHAR2(128),
   CTC_PARTITION VARCHAR2(767),
-  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID NUMBER(19)
 ) ROWDEPENDENCIES;
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -994,7 +996,7 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_WORKER_ID varchar(128),
   CQ_START NUMBER(19),
   CQ_RUN_AS varchar(128),
-  CQ_HIGHEST_TXN_ID NUMBER(19),
+  CQ_HIGHEST_WRITE_ID NUMBER(19),
   CQ_META_INFO BLOB,
   CQ_HADOOP_JOB_ID varchar2(32)
 ) ROWDEPENDENCIES;
@@ -1016,7 +1018,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_START NUMBER(19),
   CC_END NUMBER(19),
   CC_RUN_AS varchar(128),
-  CC_HIGHEST_TXN_ID NUMBER(19),
+  CC_HIGHEST_WRITE_ID NUMBER(19),
   CC_META_INFO BLOB,
   CC_HADOOP_JOB_ID varchar2(32)
 ) ROWDEPENDENCIES;
@@ -1037,6 +1039,23 @@ CREATE TABLE WRITE_SET (
   WS_OPERATION_TYPE char(1) NOT NULL
 );
 
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID number(19) NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
index a923d92..bd786fb 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
@@ -156,3 +156,29 @@ UPDATE SDS
 UPDATE DBS
   SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4)
   WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID number(19) NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE RENAME COLUMN CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+ALTER TABLE COMPLETED_COMPACTIONS RENAME COLUMN CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID number(19);
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID number(19);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
index 9d63056..4abf24c 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
@@ -1628,7 +1628,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
   TC_PARTITION varchar(767) DEFAULT NULL,
-  TC_OPERATION_TYPE char(1) NOT NULL
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID bigint
 );
 
 CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS USING hash (TC_TXNID);
@@ -1638,7 +1639,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
   CTC_DATABASE varchar(128) NOT NULL,
   CTC_TABLE varchar(256),
   CTC_PARTITION varchar(767),
-  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID bigint
 );
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS USING btree (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -1686,7 +1688,7 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_WORKER_ID varchar(128),
   CQ_START bigint,
   CQ_RUN_AS varchar(128),
-  CQ_HIGHEST_TXN_ID bigint,
+  CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO bytea,
   CQ_HADOOP_JOB_ID varchar(32)
 );
@@ -1708,7 +1710,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_START bigint,
   CC_END bigint,
   CC_RUN_AS varchar(128),
-  CC_HIGHEST_TXN_ID bigint,
+  CC_HIGHEST_WRITE_ID bigint,
   CC_META_INFO bytea,
   CC_HADOOP_JOB_ID varchar(32)
 );
@@ -1729,6 +1731,23 @@ CREATE TABLE WRITE_SET (
   WS_OPERATION_TYPE char(1) NOT NULL
 );
 
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
index eb45cd2..34ed974 100644
--- a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
@@ -172,3 +172,29 @@ UPDATE "SDS"
 UPDATE "DBS"
   SET "DB_LOCATION_URI" = 's3a' || SUBSTR("DB_LOCATION_URI", 4)
   WHERE "DB_LOCATION_URI" LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE RENAME CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+ALTER TABLE COMPLETED_COMPACTIONS RENAME CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index 35fc8b3..b11ee38 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -731,6 +731,43 @@ struct CommitTxnRequest {
     1: required i64 txnid,
 }
 
+// Request msg to get the valid write ids list for the given list of tables wrt to input validTxnList
+struct GetValidWriteIdsRequest {
+    1: required list<string> fullTableNames, // Full table names of format <db_name>.<table_name>
+    2: required string validTxnList, // Valid txn list string wrt the current txn of the caller
+}
+
+// Valid Write ID list of one table wrt to current txn
+struct TableValidWriteIds {
+    1: required string fullTableName,  // Full table name of format <db_name>.<table_name>
+    2: required i64 writeIdHighWaterMark, // The highest write id valid for this table wrt given txn
+    3: required list<i64> invalidWriteIds, // List of open and aborted writes ids in the table
+    4: optional i64 minOpenWriteId, // Minimum write id which maps to a opened txn
+    5: required binary abortedBits, // Bit array to identify the aborted write ids in invalidWriteIds list
+}
+
+// Valid Write ID list for all the input tables wrt to current txn
+struct GetValidWriteIdsResponse {
+    1: required list<TableValidWriteIds> tblValidWriteIds,
+}
+
+// Request msg to allocate table write ids for the given list of txns
+struct AllocateTableWriteIdsRequest {
+    1: required list<i64> txnIds,
+    2: required string dbName,
+    3: required string tableName,
+}
+
+// Map for allocated write id against the txn for which it is allocated
+struct TxnToWriteId {
+    1: required i64 txnId,
+    2: required i64 writeId,
+}
+
+struct AllocateTableWriteIdsResponse {
+    1: required list<TxnToWriteId> txnToWriteIds,
+}
+
 struct LockComponent {
     1: required LockType type,
     2: required LockLevel level,
@@ -850,10 +887,11 @@ struct ShowCompactResponse {
 
 struct AddDynamicPartitions {
     1: required i64 txnid,
-    2: required string dbname,
-    3: required string tablename,
-    4: required list<string> partitionnames,
-    5: optional DataOperationType operationType = DataOperationType.UNSET
+    2: required i64 writeid,
+    3: required string dbname,
+    4: required string tablename,
+    5: required list<string> partitionnames,
+    6: optional DataOperationType operationType = DataOperationType.UNSET
 }
 
 struct BasicTxnInfo {
@@ -1807,6 +1845,10 @@ service ThriftHiveMetastore extends fb303.FacebookService
   void abort_txn(1:AbortTxnRequest rqst) throws (1:NoSuchTxnException o1)
   void abort_txns(1:AbortTxnsRequest rqst) throws (1:NoSuchTxnException o1)
   void commit_txn(1:CommitTxnRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2)
+  GetValidWriteIdsResponse get_valid_write_ids(1:GetValidWriteIdsRequest rqst)
+      throws (1:NoSuchTxnException o1, 2:MetaException o2)
+  AllocateTableWriteIdsResponse allocate_table_write_ids(1:AllocateTableWriteIdsRequest rqst)
+    throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:MetaException o3)
   LockResponse lock(1:LockRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2)
   LockResponse check_lock(1:CheckLockRequest rqst)
     throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:NoSuchLockException o3)

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
deleted file mode 100644
index 94b8c58..0000000
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common;
-
-import java.util.Arrays;
-import java.util.BitSet;
-
-/**
- * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
- * 
- * Compaction should only include txns up to smallest open txn (exclussive).
- * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList.
- * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved
- * transactions.  Any txn above {@code highWatermark} is unresolved.
- * These produce the logic we need to assure that the compactor only sees records less than the lowest
- * open transaction when choosing which files to compact, but that it still ignores aborted
- * records when compacting.
- * 
- * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper
- * way to construct this.
- */
-public class ValidCompactorTxnList extends ValidReadTxnList {
-  public ValidCompactorTxnList() {
-    super();
-  }
-  public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) {
-    this(abortedTxnList, abortedBits, highWatermark, Long.MAX_VALUE);
-  }
-  /**
-   * @param abortedTxnList list of all aborted transactions
-   * @param abortedBits bitset marking whether the corresponding transaction is aborted
-   * @param highWatermark highest committed transaction to be considered for compaction,
-   *                      equivalently (lowest_open_txn - 1).
-   */
-  public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark, long minOpenTxnId) {
-    // abortedBits should be all true as everything in exceptions are aborted txns
-    super(abortedTxnList, abortedBits, highWatermark, minOpenTxnId);
-    if(this.exceptions.length <= 0) {
-      return;
-    }
-    //now that exceptions (aka abortedTxnList) is sorted
-    int idx = Arrays.binarySearch(this.exceptions, highWatermark);
-    int lastElementPos;
-    if(idx < 0) {
-      int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
-      lastElementPos = insertionPoint - 1;
-    }
-    else {
-      lastElementPos = idx;
-    }
-    /*
-     * ensure that we throw out any exceptions above highWatermark to make
-     * {@link #isTxnValid(long)} faster 
-     */
-    this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
-  }
-  public ValidCompactorTxnList(String value) {
-    super(value);
-  }
-  /**
-   * Returns org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL if all txns in
-   * the range are resolved and RangeResponse.NONE otherwise
-   */
-  @Override
-  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
-    return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
-  }
-
-  @Override
-  public boolean isTxnAborted(long txnid) {
-    return Arrays.binarySearch(exceptions, txnid) >= 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
new file mode 100644
index 0000000..9f6cf47
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * An implementation of {@link ValidWriteIdList} for use by the compactor.
+ *
+ * Compaction should only include txns up to smallest open txn (exclussive).
+ * There may be aborted write ids in the snapshot represented by this ValidCompactorWriteIdList.
+ * Thus {@link #isWriteIdRangeValid(long, long)} returns NONE for any range that includes any unresolved
+ * write ids.  Any write id above {@code highWatermark} is unresolved.
+ * These produce the logic we need to assure that the compactor only sees records less than the lowest
+ * open write ids when choosing which files to compact, but that it still ignores aborted
+ * records when compacting.
+ *
+ * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper
+ * way to construct this.
+ */
+public class ValidCompactorWriteIdList extends ValidReaderWriteIdList {
+  public ValidCompactorWriteIdList() {
+    super();
+  }
+  public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits,
+                                   long highWatermark) {
+    this(tableName, abortedWriteIdList, abortedBits, highWatermark, Long.MAX_VALUE);
+  }
+  /**
+   * @param tableName table which is under compaction. Full name of format <db_name>.<table_name>
+   * @param abortedWriteIdList list of all aborted write ids
+   * @param abortedBits bitset marking whether the corresponding transaction is aborted
+   * @param highWatermark highest committed write id to be considered for compaction,
+   *                      equivalently (lowest_open_write_id - 1).
+   * @param minOpenWriteId minimum write ID which maps to a open transaction
+   */
+  public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits,
+                                   long highWatermark, long minOpenWriteId) {
+    // abortedBits should be all true as everything in exceptions are aborted txns
+    super(tableName, abortedWriteIdList, abortedBits, highWatermark, minOpenWriteId);
+    if(this.exceptions.length <= 0) {
+      return;
+    }
+    //now that exceptions (aka abortedTxnList) is sorted
+    int idx = Arrays.binarySearch(this.exceptions, highWatermark);
+    int lastElementPos;
+    if(idx < 0) {
+      int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
+      lastElementPos = insertionPoint - 1;
+    }
+    else {
+      lastElementPos = idx;
+    }
+    /*
+     * ensure that we throw out any exceptions above highWatermark to make
+     * {@link #isWriteIdValid(long)} faster
+     */
+    this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
+  }
+  public ValidCompactorWriteIdList(String value) {
+    super(value);
+  }
+  /**
+   * Returns org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse.ALL if all write ids in
+   * the range are resolved and RangeResponse.NONE otherwise
+   */
+  @Override
+  public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {
+    return highWatermark >= maxWriteId ? RangeResponse.ALL : RangeResponse.NONE;
+  }
+
+  @Override
+  public boolean isWriteIdAborted(long writeId) {
+    return Arrays.binarySearch(exceptions, writeId) >= 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index ccdd4b7..dd432d9 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -59,24 +59,16 @@ public class ValidReadTxnList implements ValidTxnList {
 
   @Override
   public boolean isTxnValid(long txnid) {
-    if (highWatermark < txnid) {
+    if (txnid > highWatermark) {
       return false;
     }
     return Arrays.binarySearch(exceptions, txnid) < 0;
   }
 
-  /**
-   * We cannot use a base file if its range contains an open txn.
-   * @param txnid from base_xxxx
-   */
-  @Override
-  public boolean isValidBase(long txnid) {
-    return minOpenTxn > txnid && txnid <= highWatermark;
-  }
   @Override
   public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
     // check the easy cases first
-    if (highWatermark < minTxnId) {
+    if (minTxnId > highWatermark) {
       return RangeResponse.NONE;
     } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) {
       return RangeResponse.ALL;