You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/05/05 20:37:16 UTC

[2/3] hive git commit: HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c0fa97a..06cd4aa 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -72,7 +72,7 @@ import java.util.regex.Pattern;
  * used to properly sequence operations.  Most notably:
  * 1. various sequence IDs are generated with aid of this mutex
  * 2. ensuring that each (Hive) Transaction state is transitioned atomically.  Transaction state
- *  includes it's actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
+ *  includes its actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
  *  per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
  *  This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
  * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
@@ -126,6 +126,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   static private DataSource connPool;
   static private boolean doRetryOnConnPool = false;
+  
+  private enum OpertaionType {
+    INSERT('i'), UPDATE('u'), DELETE('d');
+    private final char sqlConst;
+    OpertaionType(char sqlConst) {
+      this.sqlConst = sqlConst;
+    }
+    public String toString() {
+      return Character.toString(sqlConst);
+    }
+    public static OpertaionType fromString(char sqlConst) {
+      switch (sqlConst) {
+        case 'i':
+          return INSERT;
+        case 'u':
+          return UPDATE;
+        case 'd':
+          return DELETE;
+        default:
+          throw new IllegalArgumentException(quoteChar(sqlConst));
+      }
+    }
+    //we should instead just pass in OpertaionType from client (HIVE-13622)
+    @Deprecated
+    public static OpertaionType fromLockType(LockType lockType) {
+      switch (lockType) {
+        case SHARED_READ:
+          return INSERT;
+        case SHARED_WRITE:
+          return UPDATE;
+        default:
+          throw new IllegalArgumentException("Unexpected lock type: " + lockType);
+      }
+    }
+  }
 
   /**
    * Number of consecutive deadlocks we have seen
@@ -454,6 +489,31 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  /**
+   * Concurrency/isolation notes:
+   * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
+   * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX table for specific txnid:X
+   * see more notes below.
+   * In order to prevent lost updates, we need to determine if any 2 transactions overlap.  Each txn
+   * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
+   * so that we can compare commit time of txn T with start time of txn S.  This sequence can be thought of
+   * as a logical time counter.  If S.commitTime < T.startTime, T and S do NOT overlap.
+   *
+   * Motivating example:
+   * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
+   * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
+   * that they read appropriately.  In particular, if txns do not overlap, then one follows the other
+   * (assumig they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
+   * this by locking in snapshot after 
+   * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
+   * and mutexing openTxn() with commit().  In other words, once a S.commit() starts we must ensure
+   * that txn T which will be considered a later txn, locks in a snapshot that includes the result
+   * of S's commit (assuming no other txns).
+   * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
+   * were running in parallel).  If T and S both locked in the same snapshot (for example commit of
+   * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+   * 'x' would be updated to the same value by both, i.e. lost update. 
+   */
   public void commitTxn(CommitTxnRequest rqst)
     throws NoSuchTxnException, TxnAbortedException,  MetaException {
     long txnid = rqst.getTxnid();
@@ -461,40 +521,116 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       Connection dbConn = null;
       Statement stmt = null;
       ResultSet lockHandle = null;
+      ResultSet commitIdRs = null, rs;
       try {
         lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        /**
+         * This S4U will mutex with other commitTxn() and openTxns(). 
+         * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
+         * Note: it's possible to have several txns have the same commit id.  Suppose 3 txns start
+         * at the same time and no new txns start until all 3 commit.
+         * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
+         */
+        commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
+        if(!commitIdRs.next()) {
+          throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+        }
+        long commitId = commitIdRs.getLong(1);
         /**
          * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures that no other
          * operation can change this txn (such acquiring locks). While lock() and commitTxn()
          * should not normally run concurrently (for same txn) but could due to bugs in the client
          * which could then corrupt internal transaction manager state.  Also competes with abortTxn().
          */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
         lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
         if(lockHandle == null) {
           //this also ensures that txn is still there and in expected state (hasn't been timed out)
           ensureValidTxn(dbConn, txnid, stmt);
           shouldNeverHappen(txnid);
         }
-
+        Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+        int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
+          " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " +
+          "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")");
+        if(numCompsWritten == 0) {
+          /**
+           * current txn didn't update/delete anything (may have inserted), so just proceed with commit
+           * 
+           * We only care about commit id for write txns, so for RO (when supported) txns we don't
+           * have to mutex on NEXT_TXN_ID.
+           * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
+           * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
+           * If RO < W, then there is no reads-from relationship.
+           */
+        }
+        else {
+          /**
+           * see if there are any overlapping txns wrote the same element, i.e. have a conflict
+           * Since entire commit operation is mutexed wrt other start/commit ops,
+           * committed.ws_commit_id <= current.ws_commit_id for all txns
+           * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
+           * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
+           * [17,20] committed and [21,21] committing now - these do not overlap.
+           * [17,18] committed and [18,19] committing now - these overlap  (here 18 started while 17 was still running)
+           */
+          rs = stmt.executeQuery
+            (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
+              "committed.ws_table, committed.ws_partition, cur.ws_commit_id " + 
+              "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+            "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
+              //For partitioned table we always track writes at partition level (never at table)
+              //and for non partitioned - always at table level, thus the same table should never
+              //have entries with partition key and w/o
+            "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
+            "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
+              // with txnid, though any decent DB should infer this
+            " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
+              // part of this commitTxn() op
+            " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
+              //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
+              " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +  
+                    " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+          if(rs.next()) {
+            //found a conflict
+            String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+            StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+            String partitionName = rs.getString(5);
+            if(partitionName != null) {
+              resource.append('/').append(partitionName);
+            }
+            String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+              " committed by " + committedTxn;
+            close(rs);
+            //remove WRITE_SET info for current txn since it's about to abort
+            dbConn.rollback(undoWriteSetForCurrentTxn);
+            LOG.info(msg);
+            //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
+            if(abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
+              throw new IllegalStateException(msg + " FAILED!");
+            }
+            dbConn.commit();
+            close(null, stmt, dbConn);
+            throw new TxnAbortedException(msg);
+          }
+          else {
+            //no conflicting operations, proceed with the rest of commit sequence
+          }
+        }
         // Move the record from txn_components into completed_txn_components so that the compactor
         // knows where to look to compact.
         String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
           "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute insert <" + s + ">");
         if (stmt.executeUpdate(s) < 1) {
-          //this can be reasonable for an empty txn START/COMMIT
+          //this can be reasonable for an empty txn START/COMMIT or read-only txn
           LOG.info("Expected to move at least one record from txn_components to " +
             "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
         }
-
-        // Always access TXN_COMPONENTS before HIVE_LOCKS;
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
-        // Always access HIVE_LOCKS before TXNS
         s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
@@ -510,6 +646,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        close(commitIdRs);
         close(lockHandle, stmt, dbConn);
         unlockInternal();
       }
@@ -517,7 +654,50 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       commitTxn(rqst);
     }
   }
-
+  @Override
+  public void performWriteSetGC() {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      stmt = dbConn.createStatement();
+      rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+      if(!rs.next()) {
+        throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
+      }
+      long highestAllocatedTxnId = rs.getLong(1);
+      close(rs);
+      rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
+      if(!rs.next()) {
+        throw new IllegalStateException("Scalar query returned no rows?!?!!");
+      }
+      long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
+      long lowestOpenTxnId = rs.getLong(1);
+      if(rs.wasNull()) {
+        //if here then there are no Open txns and  highestAllocatedTxnId must be
+        //resolved (i.e. committed or aborted), either way
+        //there are no open txns with id <= highestAllocatedTxnId
+        //the +1 is there because "delete ..." below has < (which is correct for the case when
+        //there is an open txn
+        //Concurrency: even if new txn starts (or starts + commits) it is still true that
+        //there are no currently open txns that overlap with any committed txn with 
+        //commitId <= commitHighWaterMark (as set on next line).  So plain READ_COMMITTED is enough.
+        commitHighWaterMark = highestAllocatedTxnId + 1;
+      }
+      else {
+        commitHighWaterMark = lowestOpenTxnId;
+      }
+      int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
+      LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+      dbConn.commit();
+    } catch (SQLException ex) {
+      LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+    }
+    finally {
+      close(rs, stmt, dbConn);
+    }
+  }
   /**
    * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
    * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
@@ -545,7 +725,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   /**
    * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
-   * and then executeUpdate().  One other alternative would be to actually update the row in TXNX but
+   * and then executeUpdate().  One other alternative would be to actually update the row in TXNS but
    * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
    *
    * There is no real reason to return the ResultSet here other than to make sure the reference to it
@@ -616,6 +796,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         stmt.executeUpdate(s);
 
         if (txnid > 0) {
+          /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get)
+           * So if we add that to LockRequest we'll know that here 
+           * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest
+           * to contain LockComponent for multiple operations.
+           * Deriving it from lock info doesn't distinguish between Update and Delete
+           * 
+           * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
+           * FileSinkDesc.table is ql.metadata.Table
+           * Table.tableSpec which is TableSpec, which has specType which is SpecType
+           * So maybe this can work to know that this is part of dynamic partition insert in which case
+           * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
+           * In any case, that's an optimization for now;  will be required when adding multi-stmt txns
+           */
           // For each component in this lock request,
           // add an entry to the txn_components table
           // This must be done before HIVE_LOCKS is accessed
@@ -624,10 +817,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             String tblName = lc.getTablename();
             String partName = lc.getPartitionname();
             s = "insert into TXN_COMPONENTS " +
-              "(tc_txnid, tc_database, tc_table, tc_partition) " +
+              "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " +
               "values (" + txnid + ", '" + dbName + "', " +
               (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-              (partName == null ? "null" : "'" + partName + "'") + ")";
+              (partName == null ? "null" : "'" + partName + "'")+ "," +
+              quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")";
             LOG.debug("Going to execute update <" + s + ">");
             stmt.executeUpdate(s);
           }
@@ -698,9 +892,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         lockInternal();
         if(dbConn.isClosed()) {
           //should only get here if retrying this op
-          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         }
-        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -756,7 +949,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         //todo: strictly speaking there is a bug here.  heartbeat*() commits but both heartbeat and
         //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
         //extra heartbeat is logically harmless, but ...
-        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -1162,11 +1354,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
       + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
   }
+
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet lockHandle = null;
+    ResultSet rs = null;
     try {
       try {
         lockInternal();
@@ -1178,18 +1372,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
           shouldNeverHappen(rqst.getTxnid());
         }
+        //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to
+        //for multi stmt txns if same table is written more than once per tx
+        // MoveTask knows if it's I/U/D
+        // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions()
+        // which ends up here so we'd need to add a field to AddDynamicPartitions.
+        String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid()
+          + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+        //do limit 1 on this; currently they will all have the same operations
+        rs = stmt.executeQuery(addLimitClause(1, findOperationType));
+        if(!rs.next()) {
+          throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid()));
+        }
+        OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0));
+        
+        //what if a txn writes the same table > 1 time... let's go with this for now, but really
+        //need to not write this in the first place, i.e. make this delete not needed
+        //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
+        String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
+          quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+        //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
+        //much "wider" than necessary in a lot of cases.  Here on the other hand, we know exactly which
+        //partitions have been written to.  w/o this WRITE_SET would contain entries for partitions not actually
+        //written to
+        stmt.executeUpdate(deleteSql);
         for (String partName : rqst.getPartitionnames()) {
-          StringBuilder buff = new StringBuilder();
-          buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values (");
-          buff.append(rqst.getTxnid());
-          buff.append(", '");
-          buff.append(rqst.getDbname());
-          buff.append("', '");
-          buff.append(rqst.getTablename());
-          buff.append("', '");
-          buff.append(partName);
-          buff.append("')");
-          String s = buff.toString();
+          String s =
+            "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" +
+              rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+              "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")";
           LOG.debug("Going to execute update <" + s + ">");
           stmt.executeUpdate(s);
         }
@@ -1908,60 +2119,113 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     return txnId != 0;
   }
   /**
+   * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+   * hl_lock_ext_id by only checking earlier locks.
+   *
+   * For any given SQL statment all locks required by it are grouped under single extLockId and are
+   * granted all at once or all locks wait.
+   *
+   * This is expected to run at READ_COMMITTED.
+   *
    * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
    * all locks for given extLockId or none.  Would be more efficient to update state on all locks
-   * at once.  Semantics are the same since this is all part of the same txn@serializable.
+   * at once.  Semantics are the same since this is all part of the same txn.
    *
-   * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
-   * hl_lock_ext_id by only checking earlier locks.
+   * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS.
+   * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid,
+   * checkLock() will in the worst case keep locks in Waiting state a little longer.
    */
-  private LockResponse checkLock(Connection dbConn,
-                                 long extLockId)
+  private LockResponse checkLock(Connection dbConn, long extLockId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
-    if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) {
-      //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations
-      //that would be less prone to deadlocks
-      throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation());
-    }
-    List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+    TxnStore.MutexAPI.LockHandle handle =  null;
+    Statement stmt = null;
+    ResultSet rs = null;
     LockResponse response = new LockResponse();
-    response.setLockid(extLockId);
-
-    LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
-    Savepoint save = dbConn.setSavepoint();//todo: get rid of this
-    StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
-      "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
-      "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
-
-    Set<String> strings = new HashSet<String>(locksBeingChecked.size());
-    for (LockInfo info : locksBeingChecked) {
-      strings.add(info.db);
-    }
-    boolean first = true;
-    for (String s : strings) {
-      if (first) first = false;
-      else query.append(", ");
-      query.append('\'');
-      query.append(s);
-      query.append('\'');
-    }
-    query.append(")");
-
-    // If any of the table requests are null, then I need to pull all the
-    // table locks for this db.
-    boolean sawNull = false;
-    strings.clear();
-    for (LockInfo info : locksBeingChecked) {
-      if (info.table == null) {
-        sawNull = true;
-        break;
-      } else {
-        strings.add(info.table);
+    /**
+     * todo: Longer term we should pass this from client somehow - this would be an optimization;  once
+     * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+     */
+    boolean isPartOfDynamicPartitionInsert = true;
+    try {
+      /**
+       * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks
+       * are not granted by parallel checkLock() calls.
+       */
+      handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name());
+      List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+      response.setLockid(extLockId);
+
+      LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
+      Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+      StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
+        "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+        "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
+
+      Set<String> strings = new HashSet<String>(locksBeingChecked.size());
+
+      //This the set of entities that the statement represnted by extLockId wants to update
+      List<LockInfo> writeSet = new ArrayList<>();
+
+      for (LockInfo info : locksBeingChecked) {
+        strings.add(info.db);
+        if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) {
+          writeSet.add(info);
+        }
       }
-    }
-    if (!sawNull) {
-      query.append(" and (hl_table is null or hl_table in(");
-      first = true;
+      if(!writeSet.isEmpty()) {
+        if(writeSet.get(0).txnId == 0) {
+          //Write operation always start a txn
+          throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid");
+        }
+        stmt = dbConn.createStatement();
+        StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " +
+          "ws_txnid, ws_commit_id " +
+          "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality
+        for(LockInfo info : writeSet) {
+          sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ")
+            .append(quoteString(info.table)).append(" and ws_partition ")
+            .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or ");
+        }
+        sb.setLength(sb.length() - 4);//nuke trailing " or "
+        sb.append(")");
+        //1 row is sufficient to know we have to kill the query
+        rs = stmt.executeQuery(addLimitClause(1, sb.toString()));
+        if(rs.next()) {
+          /**
+           * if here, it means we found an already committed txn which overlaps with the current one and
+           * it updated the same resource the current txn wants to update.  By First-committer-wins
+           * rule, current txn will not be allowed to commit so  may as well kill it now;  This is just an
+           * optimization to prevent wasting cluster resources to run a query which is known to be DOA.
+           * {@link #commitTxn(CommitTxnRequest)} has the primary responsibility to ensure this.
+           * checkLock() runs at READ_COMMITTED so you could have another (Hive) txn running commitTxn()
+           * in parallel and thus writing to WRITE_SET.  commitTxn() logic is properly mutexed to ensure
+           * that we don't "miss" any WW conflicts. We could've mutexed the checkLock() and commitTxn()
+           * as well but this reduces concurrency for very little gain.
+           * Note that update/delete (which runs as dynamic partition insert) acquires a lock on the table,
+           * but WRITE_SET has entries for actual partitions updated.  Thus this optimization will "miss"
+           * the WW conflict but it will be caught in commitTxn() where actual partitions written are known.
+           * This is OK since we want 2 concurrent updates that update different sets of partitions to both commit.
+           */
+          String resourceName = rs.getString(1) + '/' + rs.getString(2);
+          String partName = rs.getString(3);
+          if(partName != null) {
+            resourceName += '/' + partName;
+          }
+
+          String msg = "Aborting " + JavaUtils.txnIdToString(writeSet.get(0).txnId) +
+            " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) +
+            "] has already updated resouce '" + resourceName + "'";
+          LOG.info(msg);
+          if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId)) != 1) {
+            throw new IllegalStateException(msg + " FAILED!");
+          }
+          dbConn.commit();
+          throw new TxnAbortedException(msg);
+        }
+        close(rs, stmt, null);
+      }
+
+      boolean first = true;
       for (String s : strings) {
         if (first) first = false;
         else query.append(", ");
@@ -1969,22 +2233,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         query.append(s);
         query.append('\'');
       }
-      query.append("))");
+      query.append(")");
 
-      // If any of the partition requests are null, then I need to pull all
-      // partition locks for this table.
-      sawNull = false;
+      // If any of the table requests are null, then I need to pull all the
+      // table locks for this db.
+      boolean sawNull = false;
       strings.clear();
       for (LockInfo info : locksBeingChecked) {
-        if (info.partition == null) {
+        if (info.table == null) {
           sawNull = true;
           break;
         } else {
-          strings.add(info.partition);
+          strings.add(info.table);
         }
       }
       if (!sawNull) {
-        query.append(" and (hl_partition is null or hl_partition in(");
+        query.append(" and (hl_table is null or hl_table in(");
         first = true;
         for (String s : strings) {
           if (first) first = false;
@@ -1994,14 +2258,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           query.append('\'');
         }
         query.append("))");
+
+        // If any of the partition requests are null, then I need to pull all
+        // partition locks for this table.
+        sawNull = false;
+        strings.clear();
+        for (LockInfo info : locksBeingChecked) {
+          if (info.partition == null) {
+            sawNull = true;
+            break;
+          } else {
+            strings.add(info.partition);
+          }
+        }
+        if (!sawNull) {
+          query.append(" and (hl_partition is null or hl_partition in(");
+          first = true;
+          for (String s : strings) {
+            if (first) first = false;
+            else query.append(", ");
+            query.append('\'');
+            query.append(s);
+            query.append('\'');
+          }
+          query.append("))");
+        }
       }
-    }
-    query.append(" and hl_lock_ext_id <= ").append(extLockId);
+      query.append(" and hl_lock_ext_id <= ").append(extLockId);
 
-    LOG.debug("Going to execute query <" + query.toString() + ">");
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
+      LOG.debug("Going to execute query <" + query.toString() + ">");
       stmt = dbConn.createStatement();
       rs = stmt.executeQuery(query.toString());
       SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
@@ -2117,6 +2402,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       response.setState(LockState.ACQUIRED);
     } finally {
       close(rs, stmt, null);
+      if(handle != null) {
+        handle.releaseLocks();
+      }
     }
     return response;
   }
@@ -2158,7 +2446,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
       //if lock is part of txn, heartbeat info is in txn record
       "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
-    ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+    ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " +
       extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
@@ -2238,6 +2526,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       //todo: add LIMIT 1 instead of count - should be more efficient
       s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
       ResultSet rs2 = stmt.executeQuery(s);
+      //todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but only
+      //possible for for multi-stmt txns
       boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
       LOG.debug("Going to rollback");
       dbConn.rollback();

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 927e9bc..f9cac18 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -47,7 +47,7 @@ import java.util.Set;
 @InterfaceStability.Evolving
 public interface TxnStore {
 
-  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
+  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner}
   // Compactor states (Should really be enum)
   static final public String INITIATED_RESPONSE = "initiated";
   static final public String WORKING_RESPONSE = "working";
@@ -321,6 +321,12 @@ public interface TxnStore {
   public void purgeCompactionHistory() throws MetaException;
 
   /**
+   * WriteSet tracking is used to ensure proper transaction isolation.  This method deletes the 
+   * transaction metadata once it becomes unnecessary.  
+   */
+  public void performWriteSetGC();
+
+  /**
    * Determine if there are enough consecutive failures compacting a table or partition that no
    * new automatic compactions should be scheduled.  User initiated compactions do not do this
    * check.

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index cc9e583..b829d9d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -69,6 +69,8 @@ public class TxnUtils {
    * @return a valid txn list.
    */
   public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+    //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
+    // quoteChar(TXN_OPEN)  to compute compute HWM...
     long highWater = txns.getTxn_high_water_mark();
     long minOpenTxn = Long.MAX_VALUE;
     long[] exceptions = new long[txns.getOpen_txnsSize()];

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 2c1560b..80e3cd6 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -413,7 +413,7 @@ public class TestCompactionTxnHandler {
     lc.setTablename(tableName);
     LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
     lr.setTxnid(txnId);
-    LockResponse lock = txnHandler.lock(new LockRequest(Arrays.asList(lc), "me", "localhost"));
+    LockResponse lock = txnHandler.lock(lr);
     assertEquals(LockState.ACQUIRED, lock.getState());
 
     txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName,
@@ -429,8 +429,8 @@ public class TestCompactionTxnHandler {
       assertEquals(dbName, ci.dbname);
       assertEquals(tableName, ci.tableName);
       switch (i++) {
-      case 0: assertEquals("ds=today", ci.partName); break;
-      case 1: assertEquals("ds=yesterday", ci.partName); break;
+        case 0: assertEquals("ds=today", ci.partName); break;
+        case 1: assertEquals("ds=yesterday", ci.partName); break;
       default: throw new RuntimeException("What?");
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 28d0269..1a118a9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -483,6 +483,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
   }
@@ -514,6 +515,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -580,6 +582,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -602,6 +605,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -611,6 +615,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
 
@@ -633,6 +638,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -642,6 +648,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
 
@@ -651,6 +658,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -682,6 +690,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -725,6 +734,8 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    long txnId = openTxn();
+    req.setTxnid(txnId);
     LockResponse res = txnHandler.lock(req);
     long lockid1 = res.getLockid();
     assertTrue(res.getState() == LockState.ACQUIRED);
@@ -735,11 +746,12 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     long lockid2 = res.getLockid();
     assertTrue(res.getState() == LockState.WAITING);
 
-    txnHandler.unlock(new UnlockRequest(lockid1));
+    txnHandler.abortTxn(new AbortTxnRequest(txnId));
     res = txnHandler.checkLock(new CheckLockRequest(lockid2));
     assertTrue(res.getState() == LockState.ACQUIRED);
   }
@@ -1070,16 +1082,14 @@ public class TestTxnHandler {
   @Test
   public void showLocks() throws Exception {
     long begining = System.currentTimeMillis();
-    long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
     LockResponse res = txnHandler.lock(req);
 
     // Open txn
-    txnid = openTxn();
+    long txnid = openTxn();
     comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
     comp.setTablename("mytable");
     components = new ArrayList<LockComponent>(1);
@@ -1090,7 +1100,7 @@ public class TestTxnHandler {
 
     // Locks not associated with a txn
     components = new ArrayList<LockComponent>(1);
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb");
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
     comp.setTablename("yourtable");
     comp.setPartitionname("yourpartition");
     components.add(comp);
@@ -1104,14 +1114,13 @@ public class TestTxnHandler {
     for (int i = 0; i < saw.length; i++) saw[i] = false;
     for (ShowLocksResponseElement lock : locks) {
       if (lock.getLockid() == 1) {
-        assertEquals(1, lock.getTxnid());
+        assertEquals(0, lock.getTxnid());
         assertEquals("mydb", lock.getDbname());
         assertNull(lock.getTablename());
         assertNull(lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
         assertEquals(LockType.EXCLUSIVE, lock.getType());
-        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
-            lock.getTxnid() != 0);
+        assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
         assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
             + " and " + System.currentTimeMillis(),
             begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1119,7 +1128,7 @@ public class TestTxnHandler {
         assertEquals("localhost", lock.getHostname());
         saw[0] = true;
       } else if (lock.getLockid() == 2) {
-        assertEquals(2, lock.getTxnid());
+        assertEquals(1, lock.getTxnid());
         assertEquals("mydb", lock.getDbname());
         assertEquals("mytable", lock.getTablename());
         assertNull(lock.getPartname());
@@ -1137,7 +1146,7 @@ public class TestTxnHandler {
         assertEquals("yourtable", lock.getTablename());
         assertEquals("yourpartition", lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
-        assertEquals(LockType.SHARED_WRITE, lock.getType());
+        assertEquals(LockType.SHARED_READ, lock.getType());
         assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
             System.currentTimeMillis() >= lock.getLastheartbeat());
         assertTrue(begining <= lock.getAcquiredat() &&

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 1de3309..52dadb7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -377,7 +377,7 @@ public enum ErrorMsg {
       "instantiated, check hive.txn.manager"),
   TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " +
       "may have timed out", true),
-  TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true),
+  TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.  Reason: {1}", true),
   DBTXNMGR_REQUIRES_CONCURRENCY(10264,
       "To use DbTxnManager you must set hive.support.concurrency=true"),
   TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 7fa57d6..18ed864 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -172,8 +172,9 @@ public class DbLockManager implements HiveLockManager{
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(lock.getTxnid()));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(lock.getTxnid()));
     } catch (TxnAbortedException e) {
-      LOG.error("Transaction " + JavaUtils.txnIdToString(lock.getTxnid()) + " already aborted.");
-      throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()));
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 3aec8eb..9c2a346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -107,6 +107,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public long openTxn(String user) throws LockException {
+    //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
+    //whenever it chooses
     init();
     if(isTxnOpen()) {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
@@ -132,8 +134,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
-    acquireLocks(plan, ctx, username, true);
-    startHeartbeat();
+    try {
+      acquireLocks(plan, ctx, username, true);
+      startHeartbeat();
+    }
+    catch(LockException e) {
+      if(e.getCause() instanceof TxnAbortedException) {
+        txnId = 0;
+        statementId = -1;
+      }
+      throw e;
+    }
   }
 
   /**
@@ -157,7 +168,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     // For each source to read, get a shared lock
     for (ReadEntity input : plan.getInputs()) {
       if (!input.needsLock() || input.isUpdateOrDelete()) {
-        // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+        // We don't want to acquire read locks during update or delete as we'll be acquiring write
         // locks instead.
         continue;
       }
@@ -320,8 +331,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
     } catch (TxnAbortedException e) {
-      LOG.error("Transaction " + JavaUtils.txnIdToString(txnId) + " aborted");
-      throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -389,8 +401,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId));
         throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
       } catch (TxnAbortedException e) {
-        LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId));
-        throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+        LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+        LOG.error(le.getMessage());
+        throw le;
       } catch (TException e) {
         throw new LockException(
             ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
new file mode 100644
index 0000000..9085a6a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Periodically cleans WriteSet tracking information used in Transaction management
+ */
+public class AcidWriteSetService extends HouseKeeperServiceBase {
+  private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class);
+  @Override
+  protected long getStartDelayMs() {
+    return 0;
+  }
+  @Override
+  protected long getIntervalMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.WRITE_SET_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Override
+  protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+    return new WriteSetReaper(hiveConf, isAliveCounter);
+  }
+  @Override
+  public String getServiceDescription() {
+    return "Periodically cleans obsolete WriteSet tracking information used in Transaction management";
+  }
+  private static final class WriteSetReaper implements Runnable {
+    private final TxnStore txnHandler;
+    private final AtomicInteger isAliveCounter;
+    private WriteSetReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+      txnHandler = TxnUtils.getTxnStore(hiveConf);
+      this.isAliveCounter = isAliveCounter;
+    }
+    @Override
+    public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name());
+        long startTime = System.currentTimeMillis();
+        txnHandler.performWriteSetGC();
+        int count = isAliveCounter.incrementAndGet();
+        LOG.info("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
+      }
+      catch(Throwable t) {
+        LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+      }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
index 947f17c..caab10d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -81,7 +81,7 @@ public abstract class HouseKeeperServiceBase implements HouseKeeperService {
    */
   protected abstract long getStartDelayMs();
   /**
-   * Determines how fequently the service is running its task.
+   * Determines how frequently the service is running its task.
    */
   protected abstract long getIntervalMs();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index abbe5d4..949cbd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -147,7 +147,7 @@ public class Initiator extends CompactorThread {
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact " +
-                  ci + ".  Marking clean to avoid repeated failures, " +
+                  ci + ".  Marking failed to avoid repeated failures, " +
                   "" + StringUtils.stringifyException(t));
               txnHandler.markFailed(ci);
             }

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 6238e2b..767c10c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -182,7 +182,7 @@ public class Worker extends CompactorThread {
           txnHandler.markCompacted(ci);
         } catch (Exception e) {
           LOG.error("Caught exception while trying to compact " + ci +
-              ".  Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
+              ".  Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
           txnHandler.markFailed(ci);
         }
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 1030987..472da0b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -669,7 +669,7 @@ public class TestTxnCommands2 {
     t.run();
   }
 
-  private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+  public static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
     int maxIter = 10;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f87dd14..83a2ba3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -65,6 +65,26 @@ public class TestAcidUtils {
     assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
   }
+  @Test
+  public void testCreateFilenameLargeIds() throws Exception {
+    Path p = new Path("/tmp");
+    Configuration conf = new Configuration();
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .setOldStyle(true).bucket(123456789);
+    assertEquals("/tmp/123456789_0",
+      AcidUtils.createFilename(p, options).toString());
+    options.bucket(23)
+      .minimumTransactionId(1234567880)
+      .maximumTransactionId(1234567890)
+      .writingBase(true)
+      .setOldStyle(false);
+    assertEquals("/tmp/base_1234567890/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
+    options.writingBase(false);
+    assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
+  }
+  
 
   @Test
   public void testParsing() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 3a6e76e..22f7482 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.Context;
@@ -500,6 +501,12 @@ public class TestDbTxnManager {
       partCols.add(fs);
       t.setPartCols(partCols);
     }
+    Map<String, String> tblProps = t.getParameters();
+    if(tblProps == null) {
+      tblProps = new HashMap<>();
+    }
+    tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    t.setParameters(tblProps);
     return t;
   }