You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/05/19 11:05:40 UTC

[hive] branch master updated: HIVE-23234: Optimize TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7a607  HIVE-23234: Optimize TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary, Denys Kuzmenko)
2f7a607 is described below

commit 2f7a6076846387e4f84f9b356e8c6af15b8aa4dd
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Tue May 19 13:03:21 2020 +0200

    HIVE-23234: Optimize TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary, Denys Kuzmenko)
---
 .../hadoop/hive/metastore/txn/TestTxnHandler.java  |  55 ++++++++
 .../hive/metastore/txn/CompactionTxnHandler.java   |  14 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 146 ++++++++++-----------
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   4 +-
 4 files changed, 127 insertions(+), 92 deletions(-)

diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index f65619e..569605f 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -77,6 +77,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1754,6 +1755,60 @@ public class TestTxnHandler {
     assertFalse(failed);
   }
 
+  @Test
+  public void allocateNextWriteIdRetriesAfterDetectingConflictingConcurrentInsert() throws Exception {
+    String dbName = "abc";
+    String tableName = "def";
+    int numTxns = 2;
+    try (Connection dbConn = ((TxnHandler) txnHandler).getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         Statement stmt = dbConn.createStatement()) {
+      // run this multiple times to get write-write conflicts with relatively high chance
+      for (int i = 0; i < 20; ++i) {
+        // make sure these 2 tables have no records of our dbName.tableName
+        // this ensures that allocateTableWriteIds() will try to insert into next_write_id (instead of update)
+        stmt.executeUpdate("TRUNCATE TABLE \"NEXT_WRITE_ID\"");
+        stmt.executeUpdate("TRUNCATE TABLE \"TXN_TO_WRITE_ID\"");
+        dbConn.commit();
+
+        OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(numTxns, "me", "localhost"));
+        AllocateTableWriteIdsRequest request = new AllocateTableWriteIdsRequest(dbName, tableName);
+        resp.getTxn_ids().forEach(request::addToTxnIds);
+
+        // thread 1: allocating write ids for dbName.tableName
+        CompletableFuture<AllocateTableWriteIdsResponse> future1 = CompletableFuture.supplyAsync(() -> {
+          try {
+            return txnHandler.allocateTableWriteIds(request);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        // thread 2: simulating another thread allocating write ids for the same dbName.tableName
+        // (using direct DB insert as a workaround)
+        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
+          try {
+            Thread.sleep(10);
+            stmt.executeUpdate(String.format("INSERT INTO \"NEXT_WRITE_ID\" " +
+                "VALUES ('%s', '%s', 1)", dbName, tableName));
+            dbConn.commit();
+          } catch (Exception e) {
+            LOG.warn("Inserting next_write_id directly into DB failed: " + e.getMessage());
+          }
+        });
+
+        CompletableFuture.allOf(future1, future2).join();
+
+        // validate that all write ids allocation attempts have (eventually) succeeded
+        AllocateTableWriteIdsResponse result = future1.get();
+        assertEquals(2, result.getTxnToWriteIds().size());
+        assertEquals(i * numTxns + 1, result.getTxnToWriteIds().get(0).getTxnId());
+        assertEquals(1, result.getTxnToWriteIds().get(0).getWriteId());
+        assertEquals(i * numTxns + 2, result.getTxnToWriteIds().get(1).getTxnId());
+        assertEquals(2, result.getTxnToWriteIds().get(1).getWriteId());
+      }
+    }
+  }
+
   private void updateTxns(Connection conn) throws SQLException {
     Statement stmt = conn.createStatement();
     stmt.executeUpdate("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = \"TXN_LAST_HEARTBEAT\" + 1");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index d59f863..d2efc59 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -1097,12 +1097,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
-        try {
-          checkRetryable(dbConn, e, "markFailed(" + ci + ")");
-        }
-        catch(MetaException ex) {
-          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
-        }
+        checkRetryable(dbConn, e, "markFailed(" + ci + ")");
         LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
       } finally {
         close(rs, stmt, null);
@@ -1132,12 +1127,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
-        try {
-          checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
-        }
-        catch(MetaException ex) {
-          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
-        }
+        checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
         LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
       } finally {
         close(null, stmt, dbConn);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index cf41ef8..89ddccb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -271,6 +271,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
   private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
       "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
+  private static final String TXN_TO_WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " +
+      "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)";
 
 
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
@@ -1649,7 +1651,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       PreparedStatement pStmt = null;
       List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
       List<String> params = Arrays.asList(dbName, tblName);
       try {
         lockInternal();
@@ -1701,7 +1702,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           int numAborts = abortTxns(dbConn, txnIds, false);
           assert(numAborts == numAbortedWrites);
         }
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
 
         // There are some txns in the list which has no write id allocated and hence go ahead and do it.
         // Get the next write id for the given table and update it with new next write id.
@@ -1723,7 +1723,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
+        checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")", true);
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
@@ -1734,9 +1734,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
         closeStmt(pStmt);
         close(rs, stmt, dbConn);
-        if (handle != null) {
-          handle.releaseLocks();
-        }
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -1759,13 +1756,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
-    List<Long> abortedWriteIds = new ArrayList<>();
-    for (long writeId : validWriteIdList.getInvalidWriteIds()) {
-      if (validWriteIdList.isWriteIdAborted(writeId)) {
-        abortedWriteIds.add(writeId);
-      }
-    }
-    return abortedWriteIds;
+    return Arrays.stream(validWriteIdList.getInvalidWriteIds())
+        .filter(validWriteIdList::isWriteIdAborted)
+        .boxed()
+        .collect(Collectors.toList());
   }
 
   private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Long writeId) throws MetaException,
@@ -1955,17 +1949,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     String tblName = rqst.getTableName().toLowerCase();
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       PreparedStatement pStmt = null;
-      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
       List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
       List<TxnToWriteId> srcTxnToWriteIds = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         if (rqst.isSetReplPolicy()) {
           srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList();
@@ -1974,7 +1964,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           assert (!rqst.isSetTxnIds());
           assert (!srcTxnToWriteIds.isEmpty());
 
-          for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
+          for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) {
             srcTxnIds.add(txnToWriteId.getTxnId());
           }
           txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn);
@@ -1992,21 +1982,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         //Easiest check since we can't differentiate do we handle singleton list or list with multiple txn ids.
-        if(txnIds.size() > 1) {
+        if (txnIds.size() > 1) {
           Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow
         }
 
         // Check if all the input txns are in valid state.
         // Write IDs should be allocated only for open and not read-only transactions.
-        if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
-          String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
-              + " failed for input txns: "
-              + getAbortedAndReadOnlyTxns(txnIds, stmt)
-              + getCommittedTxns(txnIds, stmt);
-          LOG.error(errorMsg);
-
-          throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
-              + " as not all input txns in open state or read-only");
+        try (Statement stmt = dbConn.createStatement()) {
+          if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
+            String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
+                    + " failed for input txns: "
+                    + getAbortedAndReadOnlyTxns(txnIds, stmt)
+                    + getCommittedTxns(txnIds, stmt);
+            LOG.error(errorMsg);
+
+            throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
+                    + " as not all input txns in open state or read-only");
+          }
         }
 
         List<String> queries = new ArrayList<>();
@@ -2017,14 +2009,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
         // The write id would have been already allocated in case of multi-statement txns where
         // first write on a table will allocate write id and rest of the writes should re-use it.
-        prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
-                        + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND ");
+        prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
+              .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
                 txnIds, "\"T2W_TXNID\"", false, false);
 
         long allocatedTxnsCount = 0;
-        long txnId;
-        long writeId = 0;
+        long writeId;
         List<String> params = Arrays.asList(dbName, tblName);
         for (String query : queries) {
           pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
@@ -2033,7 +2024,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           rs = pStmt.executeQuery();
           while (rs.next()) {
             // If table write ID is already allocated for the given transaction, then just use it
-            txnId = rs.getLong(1);
+            long txnId = rs.getLong(1);
             writeId = rs.getLong(2);
             txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
             allocatedTxnsCount++;
@@ -2054,17 +2045,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         long srcWriteId = 0;
         if (rqst.isSetReplPolicy()) {
           // In replication flow, we always need to allocate write ID equal to that of source.
-          assert(srcTxnToWriteIds != null);
+          assert (srcTxnToWriteIds != null);
           srcWriteId = srcTxnToWriteIds.get(0).getWriteId();
         }
 
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
 
         // There are some txns in the list which does not have write id allocated and hence go ahead and do it.
         // Get the next write id for the given table and update it with new next write id.
         // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
         String s = sqlGenerator.addForUpdateClause(
-                "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?");
+            "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?");
         closeStmt(pStmt);
         pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
         LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
@@ -2076,7 +2066,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           // For repl flow, we need to force set the incoming write id.
           writeId = (srcWriteId > 0) ? srcWriteId : 1;
           s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, "
-                  + Long.toString(writeId + numOfWriteIds) + ")";
+                  + (writeId + numOfWriteIds) + ")";
           closeStmt(pStmt);
           pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
           LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
@@ -2087,7 +2077,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
 
           // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
-          s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds)
+          s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + (writeId + numOfWriteIds)
                   + " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?";
           closeStmt(pStmt);
           pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
@@ -2109,24 +2099,29 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
         }
 
-        // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
-        // write ids
-        List<String> rows = new ArrayList<>();
-        List<List<String>> paramsList = new ArrayList<>();
-        for (long txn : txnIds) {
-          rows.add(txn + ", ?, ?, " + writeId);
-          txnToWriteIds.add(new TxnToWriteId(txn, writeId));
-          paramsList.add(params);
-          LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
-          writeId++;
-        }
+        // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated write ids
+        try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_TO_WRITE_ID_INSERT_QUERY)) {
+          for (long txnId : txnIds) {
+            pstmt.setLong(1, txnId);
+            pstmt.setString(2, dbName);
+            pstmt.setString(3, tblName);
+            pstmt.setLong(4, writeId);
+            pstmt.addBatch();
 
-        // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
-        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-                "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows,
-                paramsList);
-        for (PreparedStatement pst : insertPreparedStmts) {
-          pst.execute();
+            txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+            LOG.info("Allocated writeId: " + writeId + " for txnId: " + txnId);
+            writeId++;
+            if (txnToWriteIds.size() % maxBatchSize == 0) {
+              LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+                  "Batch size: " + maxBatchSize);
+              pstmt.executeBatch();
+            }
+          }
+          if (txnToWriteIds.size() % maxBatchSize != 0) {
+            LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+                "Batch size: " + txnToWriteIds.size() % maxBatchSize);
+            pstmt.executeBatch();
+          }
         }
 
         if (transactionalListeners != null) {
@@ -2136,27 +2131,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                   dbConn, sqlGenerator);
         }
 
-        LOG.info("Allocated write ids for the table: " + dbName + "." + tblName);
-        LOG.debug("Going to commit");
+        LOG.info("Allocated write ids for dbName={}, tblName={} (txnIds: {})", dbName, tblName, rqst.getTxnIds());
         dbConn.commit();
         return new AllocateTableWriteIdsResponse(txnToWriteIds);
       } catch (SQLException e) {
-        LOG.debug("Going to rollback");
+        LOG.error("Exception during write ids allocation for request={}. Will retry if possible.", rqst, e);
         rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
+        checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")", true);
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
-        if (insertPreparedStmts != null) {
-          for (PreparedStatement pst : insertPreparedStmts) {
-            closeStmt(pst);
-          }
-        }
-        closeStmt(pStmt);
-        close(rs, stmt, dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
+        close(rs, pStmt, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -2169,12 +2154,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       PreparedStatement pst = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
 
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
         //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
         //for this table.  It also has a unique index in case 'should not' is violated
 
@@ -2197,9 +2180,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             + StringUtils.stringifyException(e));
       } finally {
         close(null, pst, dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -4106,6 +4086,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
     return false;
   }
+
+  /**
+   * See {@link #checkRetryable(Connection, SQLException, String, boolean)}.
+   */
+  protected void checkRetryable(Connection conn, SQLException e, String caller) throws RetryException {
+    checkRetryable(conn, e, caller, false);
+  }
+
   /**
    * Determine if an exception was such that it makes sense to retry.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
@@ -4114,11 +4102,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    * @param conn database connection
    * @param e exception that was thrown.
    * @param caller name of the method calling this (and other info useful to log)
+   * @param retryOnDuplicateKey whether to retry on unique key constraint violation
    * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried
    */
-  protected void checkRetryable(Connection conn,
-                                SQLException e,
-                                String caller) throws RetryException, MetaException {
+  protected void checkRetryable(Connection conn, SQLException e, String caller, boolean retryOnDuplicateKey)
+      throws RetryException {
 
     // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
     // to test these changes.
@@ -4150,6 +4138,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       } else if (isRetryable(conf, e)) {
         //in MSSQL this means Communication Link Failure
         sendRetrySignal = waitForRetry(caller, e.getMessage());
+      } else if (retryOnDuplicateKey && isDuplicateKeyError(e)) {
+        sendRetrySignal = waitForRetry(caller, e.getMessage());
       }
       else {
         //make sure we know we saw an error that we don't recognize
@@ -5210,7 +5200,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         break;
       case SQLSERVER:
         //2627 is unique constaint violation incl PK, 2601 - unique key
-        if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+        if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) {
           return true;
         }
         break;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 1e177f4..3e441b5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -45,8 +45,8 @@ public interface TxnStore extends Configurable {
   String TXN_KEY_START = "_meta";
 
   enum MUTEX_KEY {
-    Initiator, Cleaner, HouseKeeper, CheckLock, TxnCleaner,
-    CompactionScheduler, WriteIdAllocator, MaterializationRebuild
+    Initiator, Cleaner, HouseKeeper, TxnCleaner,
+    CompactionScheduler, MaterializationRebuild
   }
   // Compactor states (Should really be enum)
   String INITIATED_RESPONSE = "initiated";