You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/17 21:19:54 UTC

[43/43] hive git commit: HIVE-11948 Investigate TxnHandler and CompactionTxnHandler to see where we improve concurrency(Eugene Koifman, reviewed by Alan Gates)

HIVE-11948 Investigate TxnHandler and CompactionTxnHandler to see where we improve concurrency(Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d78fea10
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d78fea10
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d78fea10

Branch: refs/heads/master-fixed
Commit: d78fea10233cfc1a795d2a019520bc39dffe5036
Parents: 4ca8774
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Nov 17 12:01:44 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue Nov 17 12:18:36 2015 -0800

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  |   6 +-
 .../metastore/txn/CompactionTxnHandler.java     | 125 +++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 524 ++++++++++++++-----
 .../metastore/txn/TestCompactionTxnHandler.java |  37 --
 .../hive/metastore/txn/TestTxnHandler.java      |  10 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   2 +-
 6 files changed, 464 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 58cfbaa..806dbdb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -618,7 +618,7 @@ public class TestStreaming {
   }
 
   @Test
-  public void testHearbeat() throws Exception {
+  public void testHeartbeat() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, null);
@@ -632,14 +632,14 @@ public class TestStreaming {
     Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
     ShowLocksResponseElement lock = response.getLocks().get(0);
     long acquiredAt = lock.getAcquiredat();
-    long heartbeatAt = lock.getAcquiredat();
+    long heartbeatAt = lock.getLastheartbeat();
     txnBatch.heartbeat();
     response = msClient.showLocks();
     Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
     lock = response.getLocks().get(0);
     Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
     Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
-      ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+      ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt);
   }
   @Test
   public void testTransactionBatchEmptyAbort() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 4d9e8ae..5e4c7be 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -56,6 +56,7 @@ public class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     Set<CompactionInfo> response = new HashSet<CompactionInfo>();
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -64,7 +65,7 @@ public class CompactionTxnHandler extends TxnHandler {
         String s = "select distinct ctc_database, ctc_table, " +
           "ctc_partition from COMPLETED_TXN_COMPONENTS";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         while (rs.next()) {
           CompactionInfo info = new CompactionInfo();
           info.dbname = rs.getString(1);
@@ -72,6 +73,7 @@ public class CompactionTxnHandler extends TxnHandler {
           info.partName = rs.getString(3);
           response.add(info);
         }
+        rs.close();
 
         // Check for aborted txns
         s = "select tc_database, tc_table, tc_partition " +
@@ -97,8 +99,7 @@ public class CompactionTxnHandler extends TxnHandler {
         LOG.error("Unable to connect to transaction database " + e.getMessage());
         checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
       } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
+        close(rs, stmt, dbConn);
       }
       return response;
     }
@@ -118,7 +119,7 @@ public class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
         LOG.debug("Going to execute update <" + s + ">");
@@ -153,46 +154,58 @@ public class CompactionTxnHandler extends TxnHandler {
   public CompactionInfo findNextToCompact(String workerId) throws MetaException {
     try {
       Connection dbConn = null;
-      CompactionInfo info = new CompactionInfo();
-
       Statement stmt = null;
+      ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
           "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         if (!rs.next()) {
           LOG.debug("No compactions found ready to compact");
           dbConn.rollback();
           return null;
         }
-        info.id = rs.getLong(1);
-        info.dbname = rs.getString(2);
-        info.tableName = rs.getString(3);
-        info.partName = rs.getString(4);
-        switch (rs.getString(5).charAt(0)) {
-          case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
-          case MINOR_TYPE: info.type = CompactionType.MINOR; break;
-          default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
-        }
-
-        // Now, update this record as being worked on by this worker.
-        long now = getDbTime(dbConn);
-        s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
-          "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
-        LOG.debug("Going to execute update <" + s + ">");
-        int updCount = stmt.executeUpdate(s);
-        if (updCount != 1) {
+        do {
+          CompactionInfo info = new CompactionInfo();
+          info.id = rs.getLong(1);
+          info.dbname = rs.getString(2);
+          info.tableName = rs.getString(3);
+          info.partName = rs.getString(4);
+          switch (rs.getString(5).charAt(0)) {
+            case MAJOR_TYPE:
+              info.type = CompactionType.MAJOR;
+              break;
+            case MINOR_TYPE:
+              info.type = CompactionType.MINOR;
+              break;
+            default:
+              throw new MetaException("Unexpected compaction type " + rs.getString(5));
+          }
+          // Now, update this record as being worked on by this worker.
+          long now = getDbTime(dbConn);
+          s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+            "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+            " AND cq_state='" + INITIATED_STATE + "'";
+          LOG.debug("Going to execute update <" + s + ">");
+          int updCount = stmt.executeUpdate(s);
+          if(updCount == 1) {
+            dbConn.commit();
+            return info;
+          }
+          if(updCount == 0) {
+            LOG.debug("Another Worker picked up " + info);
+            continue;
+          }
           LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
-            info + ". updCnt=" + updCount);
-          LOG.debug("Going to rollback");
+            info + ". updCnt=" + updCount + ".");
           dbConn.rollback();
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return info;
+          return null;
+        } while( rs.next());
+        dbConn.rollback();
+        return null;
       } catch (SQLException e) {
         LOG.error("Unable to select next element for compaction, " + e.getMessage());
         LOG.debug("Going to rollback");
@@ -201,8 +214,7 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       return findNextToCompact(workerId);
@@ -219,7 +231,7 @@ public class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
           "cq_worker_id = null where cq_id = " + info.id;
@@ -240,8 +252,8 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
         closeStmt(stmt);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       markCompacted(info);
@@ -258,6 +270,7 @@ public class CompactionTxnHandler extends TxnHandler {
     List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
 
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -265,7 +278,7 @@ public class CompactionTxnHandler extends TxnHandler {
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
           "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         while (rs.next()) {
           CompactionInfo info = new CompactionInfo();
           info.id = rs.getLong(1);
@@ -291,8 +304,7 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       return findReadyToClean();
@@ -303,23 +315,28 @@ public class CompactionTxnHandler extends TxnHandler {
    * This will remove an entry from the queue after
    * it has been compacted.
    * 
-   * todo: possibly a problem?  Worker will start with DB in state X (wrt this partition).
+   * todo: Worker will start with DB in state X (wrt this partition).
    * while it's working more txns will happen, against partition it's compacting.
    * then this will delete state up to X and since then.  There may be new delta files created
    * between compaction starting and cleaning.  These will not be compacted until more
    * transactions happen.  So this ideally should only delete
    * up to TXN_ID that was compacted (i.e. HWM in Worker?)  Then this can also run
-   * at READ_COMMITTED
+   * at READ_COMMITTED.  So this means we'd want to store HWM in COMPACTION_QUEUE when
+   * Worker picks up the job.
    * 
    * Also, by using this method when Worker fails, we prevent future compactions from
-   * running until more data is written to tale or compaction is invoked explicitly
+   * running until more data is written to table or compaction is invoked explicitly
    * @param info info on the compaction entry to remove
    */
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet rs = null;
       try {
+        //do we need serializable?  Once we have the HWM as above, no.  Before that
+        //it's debatable, but problem described above applies either way
+        //Thus can drop to RC
         dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
@@ -344,19 +361,20 @@ public class CompactionTxnHandler extends TxnHandler {
             "marking compaction entry as clean!");
         }
 
-
+        //todo: add distinct in query
         s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
           TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
           info.tableName + "'";
         if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
         LOG.debug("Going to execute update <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         Set<Long> txnids = new HashSet<Long>();
         while (rs.next()) txnids.add(rs.getLong(1));
         if (txnids.size() > 0) {
 
           // Remove entries from txn_components, as there may be aborted txn components
           StringBuilder buf = new StringBuilder();
+          //todo: add a safeguard to make sure IN clause is not too large; break up by txn id
           buf.append("delete from TXN_COMPONENTS where tc_txnid in (");
           boolean first = true;
           for (long id : txnids) {
@@ -394,8 +412,7 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       markCleaned(info);
@@ -409,14 +426,17 @@ public class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        //Aborted is a terminal state, so nothing about the txn can change
+        //after that, so READ COMMITTED is sufficient.
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select txn_id from TXNS where " +
           "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
           "txn_state = '" + TXN_ABORTED + "'";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         Set<Long> txnids = new HashSet<Long>();
         while (rs.next()) txnids.add(rs.getLong(1));
         if (txnids.size() > 0) {
@@ -443,8 +463,7 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       cleanEmptyAbortedTxns();
@@ -465,7 +484,7 @@ public class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
           + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
@@ -485,8 +504,8 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
         closeStmt(stmt);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       revokeFromLocalWorkers(hostname);
@@ -507,7 +526,7 @@ public class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         long latestValidStart = getDbTime(dbConn) - timeout;
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
@@ -528,8 +547,8 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
         closeStmt(stmt);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       revokeTimedoutWorkers(timeout);

http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 5c5e6ff..7f8cb71 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -22,6 +22,7 @@ import com.jolbox.bonecp.BoneCPDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
 import org.apache.commons.dbcp.DriverManagerConnectionFactory;
 import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.tools.ant.taskdefs.Java;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
@@ -52,6 +53,11 @@ import java.util.concurrent.TimeUnit;
  * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
  * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
  * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
  */
 public class TxnHandler {
   // Compactor states
@@ -150,12 +156,20 @@ public class TxnHandler {
       // subsequently shows up in the open list that's ok.
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet rs = null;
       try {
+        /**
+         * This method can run at READ_COMMITTED as long as long as
+         * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
+         * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
+         * adding corresponding entries into TXNS.  The reason is that any txnid below HWM
+         * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+         */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select ntxn_next - 1 from NEXT_TXN_ID";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         if (!rs.next()) {
           throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
@@ -165,7 +179,7 @@ public class TxnHandler {
           throw new MetaException("Transaction tables not properly " +
             "initialized, null record found in next_txn_id");
         }
-
+        close(rs);
         List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
         //need the WHERE clause below to ensure consistent results with READ_COMMITTED
         s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm;
@@ -199,8 +213,7 @@ public class TxnHandler {
         throw new MetaException("Unable to select from transaction database: " + getMessage(e)
           + StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       return getOpenTxnsInfo();
@@ -215,12 +228,16 @@ public class TxnHandler {
       // subsequently shows up in the open list that's ok.
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet rs = null;
       try {
+        /**
+         * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+\         */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select ntxn_next - 1 from NEXT_TXN_ID";
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         if (!rs.next()) {
           throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
@@ -230,7 +247,7 @@ public class TxnHandler {
           throw new MetaException("Transaction tables not properly " +
             "initialized, null record found in next_txn_id");
         }
-
+        close(rs);
         Set<Long> openList = new HashSet<Long>();
         //need the WHERE clause below to ensure consistent results with READ_COMMITTED
         s = "select txn_id from TXNS where txn_id <= " + hwm;
@@ -249,8 +266,7 @@ public class TxnHandler {
         throw new MetaException("Unable to select from transaction database, "
           + StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       return getOpenTxns();
@@ -284,17 +300,35 @@ public class TxnHandler {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        /**
+         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
+         * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
+         * Also, advancing the counter must work when multiple metastores are running, thus either
+         * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation.  The former is preferred since it prevents
+         * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+         *
+         * In the current design, there can be several metastore instances running in a given Warehouse.
+         * This makes ideas like reserving a range of IDs to save trips to DB impossible.  For example,
+         * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
+         * Now the same client will start another transaction, except it ends up on MS2 and may get
+         * transaction ID 400 and update the same row.  Now the merge that happens to materialize the snapshot
+         * on read will thing the version of the row from transaction ID 500 is the latest one.
+         *
+         * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations).  This
+         * set could support a write-through cache for added performance.
+         */
+        dbConn = getDbConn(getRequiredIsolationLevel());
         // Make sure the user has not requested an insane amount of txns.
         int maxTxns = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
         if (numTxns > maxTxns) numTxns = maxTxns;
 
         stmt = dbConn.createStatement();
-        String s = "select ntxn_next from NEXT_TXN_ID";
+        String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID");
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         if (!rs.next()) {
           throw new MetaException("Transaction database not properly " +
             "configured, can't find next transaction id.");
@@ -312,10 +346,11 @@ public class TxnHandler {
         List<Long> txnIds = new ArrayList<Long>(numTxns);
         for (long i = first; i < first + numTxns; i++) {
           ps.setLong(1, i);
+          //todo: this would be more efficient with a single insert with multiple rows in values()
+          //need add a safeguard to not exceed the DB capabilities.
           ps.executeUpdate();
           txnIds.add(i);
         }
-
         LOG.debug("Going to commit");
         dbConn.commit();
         return new OpenTxnsResponse(txnIds);
@@ -326,8 +361,7 @@ public class TxnHandler {
         throw new MetaException("Unable to select from transaction database "
           + StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
       return openTxns(rqst);
@@ -369,6 +403,24 @@ public class TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
+        /**
+         * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS)
+         * can happen.  Otherwise we may end up with orphaned locks.  While lock() and commitTxn() should not
+         * normally run concurrently (for same txn) but could due to bugs in the client which could then
+         * (w/o SERIALIZABLE) corrupt internal transaction manager state.  Also competes with abortTxn()
+         *
+         * Sketch of an improvement:
+         * Introduce a new transaction state in TXNS, state 'c'.  This is a transient Committed state.
+         * commitTxn() would mark the txn 'c' in TXNS in an independent txn.  Other operation like
+         * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would
+         * treat 'c' txn as 'open'.  Then this method could run in READ COMMITTED since the
+         * entry for this txn in TXNS in 'c' acts like a monitor.
+         * Since the move to 'c' state is in one txn (to make it visible) and the rest of the
+         * operations in another (could even be made separate txns), there is a possibility of failure
+         * between the 2.  Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns.
+         *
+         * Or perhaps Select * TXNS where txn_id = " + txnid; for update
+         */
         dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         // Before we do the commit heartbeat the txn.  This is slightly odd in that we're going to
@@ -423,7 +475,7 @@ public class TxnHandler {
       Connection dbConn = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        return lock(dbConn, rqst, true);
+        return lock(dbConn, rqst);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
@@ -438,48 +490,49 @@ public class TxnHandler {
     }
   }
 
-  public LockResponse lockNoWait(LockRequest rqst)
-    throws NoSuchTxnException,  TxnAbortedException, MetaException {
-    try {
-      Connection dbConn = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        return lock(dbConn, rqst, false);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "lockNoWait(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      return lockNoWait(rqst);
-    }
-  }
-
+  /**
+   * Why doesn't this get a txnid as parameter?  The caller should either know the txnid or know there isn't one.
+   * Either way getTxnIdFromLockId() will not be needed.  This would be a Thrift change.
+   *
+   * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(),
+   * in practice more often)
+   * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB.
+   *
+   * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
+   * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
+   *
+   * {@link #checkLock(java.sql.Connection, long)}  must run at SERIALIZABLE (make sure some lock we are checking
+   * against doesn't move from W to A in another txn) but this method can heartbeat in
+   * separate txn at READ_COMMITTED.
+   */
   public LockResponse checkLock(CheckLockRequest rqst)
     throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
     try {
       Connection dbConn = null;
+      long extLockId = rqst.getLockid();
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        long extLockId = rqst.getLockid();
-
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         // Heartbeat on the lockid first, to assure that our lock is still valid.
         // Then look up the lock info (hopefully in the cache).  If these locks
         // are associated with a transaction then heartbeat on that as well.
-        heartbeatLock(dbConn, extLockId);
-        long txnid = getTxnIdFromLockId(dbConn, extLockId);
-        if (txnid > 0)  heartbeatTxn(dbConn, txnid);
-        return checkLock(dbConn, extLockId, true);
+        Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+        if(txnid == null) {
+          throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+        }
+        if (txnid > 0) {
+          heartbeatTxn(dbConn, txnid);
+        }
+        else {
+          heartbeatLock(dbConn, extLockId);
+        }
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
         throw new MetaException("Unable to update transaction database " +
-          StringUtils.stringifyException(e));
+          JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
@@ -489,39 +542,56 @@ public class TxnHandler {
 
   }
 
+  /**
+   * This would have been made simpler if all locks were associated with a txn.  Then only txn needs to
+   * be heartbeated, committed, etc.  no need for client to track individual locks.
+   */
   public void unlock(UnlockRequest rqst)
     throws NoSuchLockException, TxnOpenException, MetaException {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      long extLockId = rqst.getLockid();
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        // Odd as it seems, we need to heartbeat first because this touches the
-        // lock table and assures that our locks our still valid.  If they are
-        // not, this will throw an exception and the heartbeat will fail.
-        long extLockId = rqst.getLockid();
-        heartbeatLock(dbConn, extLockId);
-        long txnid = getTxnIdFromLockId(dbConn, extLockId);
-        // If there is a valid txnid, throw an exception,
-        // as locks associated with transactions should be unlocked only when the
-        // transaction is committed or aborted.
-        if (txnid > 0) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          String msg = "Unlocking locks associated with transaction" +
-            " not permitted.  Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
-            "transaction " + JavaUtils.txnIdToString(txnid);
-          LOG.error(msg);
-          throw new TxnOpenException(msg);
-        }
+        /**
+         * This method is logically like commit for read-only auto commit queries.
+         * READ_COMMITTED since this only has 1 delete statement and no new entries with the
+         * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are
+         * created in a single atomic operation.
+         * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)}
+         * but hl_lock_ext_id is not known until that method returns.
+         * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}
+         * but using SERIALIZABLE doesn't materially change the interaction.
+         * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg.
+         */
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId;
+        //hl_txnid <> 0 means it's associated with a transaction
+        String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0";
         LOG.debug("Going to execute update <" + s + ">");
         int rc = stmt.executeUpdate(s);
         if (rc < 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
-          throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+          Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+          if(txnid == null) {
+            LOG.error("No lock found for unlock(" + rqst + ")");
+            throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+          }
+          if(txnid != 0) {
+            String msg = "Unlocking locks associated with transaction" +
+              " not permitted.  Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
+              "transaction " + JavaUtils.txnIdToString(txnid);
+            LOG.error(msg);
+            throw new TxnOpenException(msg);
+          }
+          if(txnid == 0) {
+            //we didn't see this lock when running DELETE stmt above but now it showed up
+            //so should "should never happen" happened...
+            String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid);
+            LOG.error(msg);
+            throw new MetaException(msg);
+          }
         }
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -530,7 +600,7 @@ public class TxnHandler {
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "unlock(" + rqst + ")");
         throw new MetaException("Unable to update transaction database " +
-          StringUtils.stringifyException(e));
+          JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
@@ -615,6 +685,10 @@ public class TxnHandler {
     }
   }
 
+  /**
+   * {@code ids} should only have txnid or lockid but not both, ideally.
+   * Currently DBTxnManager.heartbeat() enforces this.
+   */
   public void heartbeat(HeartbeatRequest ids)
     throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException {
     try {
@@ -647,9 +721,17 @@ public class TxnHandler {
       rsp.setNosuch(nosuch);
       rsp.setAborted(aborted);
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        /**
+         * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)}
+         * only has 1 update statement in it and
+         * we only update existing txns, i.e. nothing can add additional txns that this operation
+         * would care about (which would have required SERIALIZABLE)
+         */
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
           try {
+            //todo: this is expensive call: at least 2 update queries per txn
+            //is this really worth it?
             heartbeatTxn(dbConn, txn);
           } catch (NoSuchTxnException e) {
             nosuch.add(txn);
@@ -678,11 +760,11 @@ public class TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(getRequiredIsolationLevel());
         stmt = dbConn.createStatement();
 
         // Get the id for the next entry in the queue
-        String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
+        String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID");
         LOG.debug("going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -1293,19 +1375,31 @@ public class TxnHandler {
     }
   }
 
+  private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+    return abortTxns(dbConn, txnids, -1);
+  }
   /**
-   * Abort a group of txns
+   * TODO: expose this as an operation to client.  Useful for streaming API to abort all remaining
+   * trasnactions in a batch on IOExceptions.
    * @param dbConn An active connection
    * @param txnids list of transactions to abort
+   * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
+   *                      hearbetated after #performTimeOuts() select and this operation.
    * @return Number of aborted transactions
    * @throws SQLException
    */
-  private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+  private int abortTxns(Connection dbConn, List<Long> txnids, long max_heartbeat) throws SQLException {
     Statement stmt = null;
     int updateCnt = 0;
     if (txnids.isEmpty()) {
       return 0;
     }
+    if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) {
+      /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently
+        * which would cause them to become orphaned.
+        */
+      throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation());
+    }
     try {
       stmt = dbConn.createStatement();
 
@@ -1321,6 +1415,8 @@ public class TxnHandler {
       LOG.debug("Going to execute update <" + buf.toString() + ">");
       stmt.executeUpdate(buf.toString());
 
+      //todo: seems like we should do this first and if it misses, don't bother with
+      //delete from HIVE_LOCKS since it will be rolled back
       buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED +
         "' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
       first = true;
@@ -1330,6 +1426,9 @@ public class TxnHandler {
         buf.append(id);
       }
       buf.append(')');
+      if(max_heartbeat > 0) {
+        buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+      }
       LOG.debug("Going to execute update <" + buf.toString() + ">");
       updateCnt = stmt.executeUpdate(buf.toString());
 
@@ -1340,22 +1439,33 @@ public class TxnHandler {
   }
 
   /**
+   * Isolation Level Notes:
+   * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here.
+   * 
+   * Ramblings:
+   * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC
+   * since this is just in Wait state.
+   * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of
+   * failure, the W locks will timeout if failure does not propagate to client in some way, or it
+   * will and client will Abort).
+   * Actually, whether we can do this depends on what happens when you try to get a lock and notice
+   * a conflicting locks in W mode do we wait in this case?  if so it's a problem because while you
+   * are checking new locks someone may insert new  W locks that you don't see...
+   * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume
+   * that additional W locks will have higher IDs????
+   *
+   * We can use Select for Update to generate the next LockID.  In fact we can easily do this in a separate txn.
+   * This avoids contention on NEXT_LOCK_ID.  The rest of the logic will be still need to be done at Serializable, I think,
+   * but it will not be updating the same row from 2 DB.
+   *
    * Request a lock
    * @param dbConn database connection
    * @param rqst lock information
-   * @param wait whether to wait for this lock.  The function will return immediately one way or
-   *             another.  If true and the lock could not be acquired the response will have a
-   *             state of  WAITING.  The caller will then need to poll using
-   *             {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}. If
-   *             false and the  lock could not be acquired, then the response will have a state
-   *             of NOT_ACQUIRED.  The caller will need to call
-   *             {@link #lockNoWait(org.apache.hadoop.hive.metastore.api.LockRequest)} again to
-   *             attempt another lock.
    * @return information on whether the lock was acquired.
    * @throws NoSuchTxnException
    * @throws TxnAbortedException
    */
-  private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
+  private LockResponse lock(Connection dbConn, LockRequest rqst)
     throws NoSuchTxnException,  TxnAbortedException, MetaException, SQLException {
     // We want to minimize the number of concurrent lock requests being issued.  If we do not we
     // get a large number of deadlocks in the database, since this method has to both clean
@@ -1368,13 +1478,25 @@ public class TxnHandler {
     // etc.) that should not interfere with this one.
     synchronized (lockLock) {
       Statement stmt = null;
+      ResultSet rs = null;
       try {
+        long txnid = rqst.getTxnid();
+        if (txnid > 0) {
+          // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
+          // are locking.
+          heartbeatTxn(dbConn, txnid);
+        }
         stmt = dbConn.createStatement();
 
-        // Get the next lock id.
-        String s = "select nl_next from NEXT_LOCK_ID";
+       /** Get the next lock id.
+        * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
+        * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running.  1st one generates nl_next=7,
+        * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and aquires the locks.  Then 7 unblocks,
+        * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
+        * doesn't block on locks acquired later than one it's checking*/
+        String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
         LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        rs = stmt.executeQuery(s);
         if (!rs.next()) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
@@ -1385,18 +1507,19 @@ public class TxnHandler {
         s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
-        LOG.debug("Going to commit.");
-        dbConn.commit();
 
-        long txnid = rqst.getTxnid();
         if (txnid > 0) {
-          // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
-          // are locking.
-          heartbeatTxn(dbConn, txnid);
-
           // For each component in this lock request,
           // add an entry to the txn_components table
           // This must be done before HIVE_LOCKS is accessed
+          
+          //Isolation note:
+          //the !wait option is not actually used anywhere.  W/o that,
+          // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id
+          //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable.
+          //
+          // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long
+          //as check lock is at serializable (or any other way to make sure it's exclusive)
           for (LockComponent lc : rqst.getComponent()) {
             String dbName = lc.getDbname();
             String tblName = lc.getTablename();
@@ -1429,34 +1552,42 @@ public class TxnHandler {
             " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
             "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
             " values (" + extLockId + ", " +
-            + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
+            + intLockId + "," + txnid + ", '" +
             dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
             + ", " + (partName == null ? "null" : "'" + partName + "'") +
-            ", '" + LOCK_WAITING + "', " +  "'" + lockChar + "', " + now + ", '" +
+            ", '" + LOCK_WAITING + "', " +  "'" + lockChar + "', " +
+            //for locks associated with a txn, we always heartbeat txn and timeout based on that
+            (isValidTxn(txnid) ? 0 : now) + ", '" +
             rqst.getUser() + "', '" + rqst.getHostname() + "')";
           LOG.debug("Going to execute update <" + s + ">");
           stmt.executeUpdate(s);
         }
-        LockResponse rsp = checkLock(dbConn, extLockId, wait);
-        if (!wait && rsp.getState() != LockState.ACQUIRED) {
-          LOG.debug("Lock not acquired, going to rollback");
-          dbConn.rollback();
-          rsp = new LockResponse();
-          rsp.setState(LockState.NOT_ACQUIRED);
-        }
-        return rsp;
+        /**to make txns shorter we could commit here and start a new txn for checkLock.  This would
+         * require moving checkRetryable() down into here.  Could we then run the part before this
+         * commit are READ_COMMITTED?*/
+        return checkLock(dbConn, extLockId);
       } catch (NoSuchLockException e) {
         // This should never happen, as we just added the lock id
         throw new MetaException("Couldn't find a lock we just created!");
       } finally {
+        close(rs);
         closeStmt(stmt);
       }
     }
   }
-
+  private static boolean isValidTxn(long txnId) {
+    return txnId != 0;
+  }
+  /**
+   * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
+   * all locks for given extLockId or none.  Would be more efficient to update state on all locks
+   * at once.  Semantics are the same since this is all part of the same txn@serializable.
+   *
+   * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+   * hl_lock_ext_id by only checking earlier locks.
+   */
   private LockResponse checkLock(Connection dbConn,
-                                 long extLockId,
-                                 boolean alwaysCommit)
+                                 long extLockId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
     List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
     LockResponse response = new LockResponse();
@@ -1609,19 +1740,15 @@ public class TxnHandler {
             case WAIT:
               if(!ignoreConflict(info, locks[i])) {
                 wait(dbConn, save);
-                if (alwaysCommit) {
-                  // In the case where lockNoWait has been called we don't want to commit because
-                  // it's going to roll everything back. In every other case we want to commit here.
-                  LOG.debug("Going to commit");
-                  dbConn.commit();
-                }
+                LOG.debug("Going to commit");
+                dbConn.commit();
                 response.setState(LockState.WAITING);
                 LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")");
                 return response;
               }
               //fall through to ACQUIRE
             case ACQUIRE:
-              acquire(dbConn, stmt, extLockId, info.intLockId);
+              acquire(dbConn, stmt, extLockId, info);
               acquired = true;
               break;
             case KEEP_LOOKING:
@@ -1633,7 +1760,7 @@ public class TxnHandler {
 
         // If we've arrived here and we have not already acquired, it means there's nothing in the
         // way of the lock, so acquire the lock.
-        if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
+        if (!acquired) acquire(dbConn, stmt, extLockId, info);
       }
 
       // We acquired all of the locks, so commit and return acquired.
@@ -1677,26 +1804,31 @@ public class TxnHandler {
     dbConn.rollback(save);
   }
 
-  private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
+  private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo)
     throws SQLException, NoSuchLockException, MetaException {
     long now = getDbTime(dbConn);
     String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
-      "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
-      extLockId + " and hl_lock_int_id = " + intLockId;
+      //if lock is part of txn, heartbeat info is in txn record
+      "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
+    ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+      extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
     if (rc < 1) {
       LOG.debug("Going to rollback");
       dbConn.rollback();
       throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," +
-        + intLockId + ")");
+        + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId));
     }
     // We update the database, but we don't commit because there may be other
     // locks together with this, and we only want to acquire one if we can
     // acquire all.
   }
 
-  // Heartbeats on the lock table.  This commits, so do not enter it with any state
+  /**
+   * Heartbeats on the lock table.  This commits, so do not enter it with any state.
+   * Should not be called on a lock that belongs to transaction.
+   */
   private void heartbeatLock(Connection dbConn, long extLockId)
     throws NoSuchLockException, SQLException, MetaException {
     // If the lock id is 0, then there are no locks in this heartbeat
@@ -1731,7 +1863,6 @@ public class TxnHandler {
     try {
       stmt = dbConn.createStatement();
       long now = getDbTime(dbConn);
-      ensureValidTxn(dbConn, txnid, stmt);
       String s = "update TXNS set txn_last_heartbeat = " + now +
         " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'";
       LOG.debug("Going to execute update <" + s + ">");
@@ -1742,10 +1873,6 @@ public class TxnHandler {
         dbConn.rollback();
         throw new NoSuchTxnException("No such txn: " + txnid);
       }
-      //update locks for this txn to the same heartbeat
-      s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
       LOG.debug("Going to commit");
       dbConn.commit();
     } finally {
@@ -1760,6 +1887,7 @@ public class TxnHandler {
     LOG.debug("Going to execute query <" + s + ">");
     ResultSet rs = stmt.executeQuery(s);
     if (!rs.next()) {
+      //todo: add LIMIT 1 instead of count - should be more efficient
       s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
       ResultSet rs2 = stmt.executeQuery(s);
       boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
@@ -1775,28 +1903,28 @@ public class TxnHandler {
       LOG.debug("Going to rollback");
       dbConn.rollback();
       throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) +
-        " already aborted");//todo: add time of abort, which is not currently tracked
+        " already aborted");//todo: add time of abort, which is not currently tracked.  Requires schema change
     }
   }
 
-  // NEVER call this function without first calling heartbeat(long, long)
-  private long getTxnIdFromLockId(Connection dbConn, long extLockId)
+  private Long getTxnIdFromLockId(Connection dbConn, long extLockId)
     throws NoSuchLockException, MetaException, SQLException {
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
       String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
         extLockId;
       LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
+      rs = stmt.executeQuery(s);
       if (!rs.next()) {
-        throw new MetaException("This should never happen!  We already " +
-          "checked the lock existed but now we can't find it!");
+        return null;
       }
       long txnid = rs.getLong(1);
-      LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid));
-      return (rs.wasNull() ? -1 : txnid);
+      LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid));
+      return txnid;
     } finally {
+      close(rs);
       closeStmt(stmt);
     }
   }
@@ -1832,14 +1960,13 @@ public class TxnHandler {
   // for read-only autoCommit=true statements.  This does a commit,
   // and thus should be done before any calls to heartbeat that will leave
   // open transactions.
-  private void timeOutLocks(Connection dbConn) {
+  private void timeOutLocks(Connection dbConn, long now) {
     Statement stmt = null;
     try {
-      long now = getDbTime(dbConn);
       stmt = dbConn.createStatement();
       // Remove any timed out locks from the table.
       String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
-        (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is
+        (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
       //associated with a txn and is handled by performTimeOuts()
       //want to avoid expiring locks for a txn w/o expiring the txn itself
       LOG.debug("Going to execute update <" + s + ">");
@@ -1891,6 +2018,8 @@ public class TxnHandler {
     }
   }
   /**
+   * Isolation Level Notes
+   * Plain: RC is OK
    * This will find transactions that have timed out and abort them.
    * Will also delete locks which are not associated with a transaction and have timed out
    * Tries to keep transactions (against metastore db) small to reduce lock contention.
@@ -1900,9 +2029,19 @@ public class TxnHandler {
     Statement stmt = null;
     ResultSet rs = null;
     try {
-      dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      //We currently commit after selecting the TXNS to abort.  So whether SERIALIZABLE
+      //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE on Select from TXNS
+      //and do the whole performTimeOuts() in a single huge transaction, but the only benefit
+      //would be to make sure someone cannot heartbeat one of these txns at the same time.
+      //The attempt to heartbeat would block and fail immediately after it's unblocked.
+      //With current (RC + multiple txns) implementation it is possible for someone to send
+      //heartbeat at the very end of the expire interval, and just after the Select from TXNS
+      //is made, in which case heartbeat will succeed but txn will still be Aborted.
+      //Solving this corner case is not worth the perf penalty.  The client should heartbeat in a
+      //timely way.
       long now = getDbTime(dbConn);
-      timeOutLocks(dbConn);
+      timeOutLocks(dbConn, now);
       while(true) {
         stmt = dbConn.createStatement();
         String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
@@ -1923,16 +2062,26 @@ public class TxnHandler {
             timedOutTxns.add(currentBatch);
           }
         } while(rs.next());
-        close(rs, stmt, null);
         dbConn.commit();
+        close(rs, stmt, dbConn);
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        int numTxnsAborted = 0;
         for(List<Long> batchToAbort : timedOutTxns) {
-          abortTxns(dbConn, batchToAbort);
-          dbConn.commit();
-          //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
-          LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+          if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) {
+            dbConn.commit();
+            numTxnsAborted += batchToAbort.size();
+            //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
+            LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+          }
+          else {
+            //could not abort all txns in this batch - this may happen because in parallel with this
+            //operation there was activity on one of the txns in this batch (commit/abort/heartbeat)
+            //This is not likely but may happen if client experiences long pause between heartbeats or
+            //unusually long/extreme pauses between heartbeat() calls and other logic in checkLock(),
+            //lock(), etc.
+            dbConn.rollback();
+          }
         }
-        int numTxnsAborted = (timedOutTxns.size() - 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE +
-          timedOutTxns.get(timedOutTxns.size() - 1).size();
         LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout");
       }
     } catch (SQLException ex) {
@@ -2110,4 +2259,97 @@ public class TxnHandler {
   private static String getMessage(SQLException ex) {
     return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
   }
+  /**
+   * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc.
+   * Different DBs support different concurrency management options.  This class relies on SELECT ... FOR UPDATE
+   * functionality.  Where that is not available, SERIALIZABLE isolation is used.
+   * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that
+   * if FOR UPDATE is not available, must run operation at SERIALIZABLE.
+   */
+  private int getRequiredIsolationLevel() throws MetaException, SQLException {
+    if(dbProduct == null) {
+      Connection tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      determineDatabaseProduct(tmp);
+      closeDbConn(tmp);
+    }
+    switch (dbProduct) {
+      case DERBY:
+        return Connection.TRANSACTION_SERIALIZABLE;
+      case MYSQL:
+      case ORACLE:
+      case POSTGRES:
+      case SQLSERVER:
+        return Connection.TRANSACTION_READ_COMMITTED;
+      default:
+        String msg = "Unrecognized database product name <" + dbProduct + ">";
+        LOG.error(msg);
+        throw new MetaException(msg);
+    }
+  }
+  /**
+   * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
+   * construct.  If the DB doesn't support, return original select.  This method must always
+   * agree with {@link #getRequiredIsolationLevel()}
+   */
+  private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException {
+    DatabaseProduct prod = determineDatabaseProduct(dbConn);
+    switch (prod) {
+      case DERBY:
+        //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
+        //sadly in Derby, FOR UPDATE doesn't meant what it should
+        return selectStatement;
+      case MYSQL:
+        //http://dev.mysql.com/doc/refman/5.7/en/select.html
+      case ORACLE:
+        //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
+      case POSTGRES:
+        //http://www.postgresql.org/docs/9.0/static/sql-select.html
+        return selectStatement + " for update";
+      case SQLSERVER:
+        //https://msdn.microsoft.com/en-us/library/ms189499.aspx
+        //https://msdn.microsoft.com/en-us/library/ms187373.aspx
+        return selectStatement + " with(updlock)";
+      default:
+        String msg = "Unrecognized database product name <" + prod + ">";
+        LOG.error(msg);
+        throw new MetaException(msg);
+    }
+  }
+  /**
+   * the caller is expected to retry if this fails
+   *
+   * @return
+   * @throws SQLException
+   * @throws MetaException
+   */
+  private long generateNewExtLockId() throws SQLException, MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      dbConn = getDbConn(getRequiredIsolationLevel());
+      stmt = dbConn.createStatement();
+
+      // Get the next lock id.
+      String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        throw new MetaException("Transaction tables not properly " +
+          "initialized, no record found in next_lock_id");
+      }
+      long extLockId = rs.getLong(1);
+      s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+      LOG.debug("Going to commit.");
+      dbConn.commit();
+      return extLockId;
+    }
+    finally {
+      close(rs, stmt, dbConn);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index df42f1a..06e0932 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -280,43 +280,6 @@ public class TestCompactionTxnHandler {
   }
 
   @Test
-  public void testLockNoWait() throws Exception {
-    // Test that we can acquire the lock alone
-     LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lockNoWait(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.unlock(new UnlockRequest(res.getLockid()));
-
-    // test that another lock blocks it
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertEquals(LockState.ACQUIRED, res.getState());
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lockNoWait(req);
-    assertEquals(LockState.NOT_ACQUIRED, res.getState());
-    assertEquals(1, TxnDbUtil.findNumCurrentLocks());
-  }
-
-  @Test
   public void testFindPotentialCompactions() throws Exception {
     // Test that committing unlocks
     long txnid = openTxn();

http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 59114fe..4debd04 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1107,8 +1107,8 @@ public class TestTxnHandler {
         assertNull(lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
         assertEquals(LockType.EXCLUSIVE, lock.getType());
-        assertTrue(begining <= lock.getLastheartbeat() &&
-            System.currentTimeMillis() >= lock.getLastheartbeat());
+        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+            lock.getTxnid() != 0);
         assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
             + " and " + System.currentTimeMillis(),
             begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1122,8 +1122,8 @@ public class TestTxnHandler {
         assertNull(lock.getPartname());
         assertEquals(LockState.WAITING, lock.getState());
         assertEquals(LockType.SHARED_READ, lock.getType());
-        assertTrue(begining <= lock.getLastheartbeat() &&
-            System.currentTimeMillis() >= lock.getLastheartbeat());
+        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+          lock.getTxnid() != 0);
         assertEquals(0, lock.getAcquiredat());
         assertEquals("me", lock.getUser());
         assertEquals("localhost", lock.getHostname());
@@ -1135,7 +1135,7 @@ public class TestTxnHandler {
         assertEquals("yourpartition", lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
         assertEquals(LockType.SHARED_WRITE, lock.getType());
-        assertTrue(begining <= lock.getLastheartbeat() &&
+        assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
             System.currentTimeMillis() >= lock.getLastheartbeat());
         assertTrue(begining <= lock.getAcquiredat() &&
             System.currentTimeMillis() >= lock.getAcquiredat());

http://git-wip-us.apache.org/repos/asf/hive/blob/d78fea10/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 9a5495b..ceaafe1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1046,7 +1046,7 @@ public class Driver implements CommandProcessor {
         // don't update it after that until txn completes.  Thus the check for {@code initiatingTransaction}
         //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
         //for each statement.
-        recordValidTxns();
+        recordValidTxns();//todo: we should only need to do this for RO query if it has ACID resources in it.
       }
 
       return 0;