You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/05/19 11:05:40 UTC
[hive] branch master updated: HIVE-23234: Optimize
TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary,
Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2f7a607 HIVE-23234: Optimize TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary, Denys Kuzmenko)
2f7a607 is described below
commit 2f7a6076846387e4f84f9b356e8c6af15b8aa4dd
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Tue May 19 13:03:21 2020 +0200
HIVE-23234: Optimize TxnHandler::allocateTableWriteIds (Marton Bod reviewed by Peter Vary, Denys Kuzmenko)
---
.../hadoop/hive/metastore/txn/TestTxnHandler.java | 55 ++++++++
.../hive/metastore/txn/CompactionTxnHandler.java | 14 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 146 ++++++++++-----------
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 4 +-
4 files changed, 127 insertions(+), 92 deletions(-)
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index f65619e..569605f 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -77,6 +77,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1754,6 +1755,60 @@ public class TestTxnHandler {
assertFalse(failed);
}
+ @Test
+ public void allocateNextWriteIdRetriesAfterDetectingConflictingConcurrentInsert() throws Exception {
+ String dbName = "abc";
+ String tableName = "def";
+ int numTxns = 2;
+ try (Connection dbConn = ((TxnHandler) txnHandler).getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Statement stmt = dbConn.createStatement()) {
+ // run this multiple times to get write-write conflicts with relatively high chance
+ for (int i = 0; i < 20; ++i) {
+ // make sure these 2 tables have no records of our dbName.tableName
+ // this ensures that allocateTableWriteIds() will try to insert into next_write_id (instead of update)
+ stmt.executeUpdate("TRUNCATE TABLE \"NEXT_WRITE_ID\"");
+ stmt.executeUpdate("TRUNCATE TABLE \"TXN_TO_WRITE_ID\"");
+ dbConn.commit();
+
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(numTxns, "me", "localhost"));
+ AllocateTableWriteIdsRequest request = new AllocateTableWriteIdsRequest(dbName, tableName);
+ resp.getTxn_ids().forEach(request::addToTxnIds);
+
+ // thread 1: allocating write ids for dbName.tableName
+ CompletableFuture<AllocateTableWriteIdsResponse> future1 = CompletableFuture.supplyAsync(() -> {
+ try {
+ return txnHandler.allocateTableWriteIds(request);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // thread 2: simulating another thread allocating write ids for the same dbName.tableName
+ // (using direct DB insert as a workaround)
+ CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(10);
+ stmt.executeUpdate(String.format("INSERT INTO \"NEXT_WRITE_ID\" " +
+ "VALUES ('%s', '%s', 1)", dbName, tableName));
+ dbConn.commit();
+ } catch (Exception e) {
+ LOG.warn("Inserting next_write_id directly into DB failed: " + e.getMessage());
+ }
+ });
+
+ CompletableFuture.allOf(future1, future2).join();
+
+ // validate that all write ids allocation attempts have (eventually) succeeded
+ AllocateTableWriteIdsResponse result = future1.get();
+ assertEquals(2, result.getTxnToWriteIds().size());
+ assertEquals(i * numTxns + 1, result.getTxnToWriteIds().get(0).getTxnId());
+ assertEquals(1, result.getTxnToWriteIds().get(0).getWriteId());
+ assertEquals(i * numTxns + 2, result.getTxnToWriteIds().get(1).getTxnId());
+ assertEquals(2, result.getTxnToWriteIds().get(1).getWriteId());
+ }
+ }
+ }
+
private void updateTxns(Connection conn) throws SQLException {
Statement stmt = conn.createStatement();
stmt.executeUpdate("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = \"TXN_LAST_HEARTBEAT\" + 1");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index d59f863..d2efc59 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -1097,12 +1097,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- try {
- checkRetryable(dbConn, e, "markFailed(" + ci + ")");
- }
- catch(MetaException ex) {
- LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
- }
+ checkRetryable(dbConn, e, "markFailed(" + ci + ")");
LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
} finally {
close(rs, stmt, null);
@@ -1132,12 +1127,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- try {
- checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
- }
- catch(MetaException ex) {
- LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
- }
+ checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
} finally {
close(null, stmt, dbConn);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index cf41ef8..89ddccb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -271,6 +271,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
"WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
+ private static final String TXN_TO_WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " +
+ "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)";
private List<TransactionalMetaStoreEventListener> transactionalListeners;
@@ -1649,7 +1651,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
PreparedStatement pStmt = null;
List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
- TxnStore.MutexAPI.LockHandle handle = null;
List<String> params = Arrays.asList(dbName, tblName);
try {
lockInternal();
@@ -1701,7 +1702,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
int numAborts = abortTxns(dbConn, txnIds, false);
assert(numAborts == numAbortedWrites);
}
- handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
// There are some txns in the list which has no write id allocated and hence go ahead and do it.
// Get the next write id for the given table and update it with new next write id.
@@ -1723,7 +1723,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
+ checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")", true);
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
@@ -1734,9 +1734,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
closeStmt(pStmt);
close(rs, stmt, dbConn);
- if (handle != null) {
- handle.releaseLocks();
- }
unlockInternal();
}
} catch (RetryException e) {
@@ -1759,13 +1756,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
- List<Long> abortedWriteIds = new ArrayList<>();
- for (long writeId : validWriteIdList.getInvalidWriteIds()) {
- if (validWriteIdList.isWriteIdAborted(writeId)) {
- abortedWriteIds.add(writeId);
- }
- }
- return abortedWriteIds;
+ return Arrays.stream(validWriteIdList.getInvalidWriteIds())
+ .filter(validWriteIdList::isWriteIdAborted)
+ .boxed()
+ .collect(Collectors.toList());
}
private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Long writeId) throws MetaException,
@@ -1955,17 +1949,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String tblName = rqst.getTableName().toLowerCase();
try {
Connection dbConn = null;
- Statement stmt = null;
PreparedStatement pStmt = null;
- List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
- TxnStore.MutexAPI.LockHandle handle = null;
List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
List<TxnToWriteId> srcTxnToWriteIds = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
if (rqst.isSetReplPolicy()) {
srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList();
@@ -1974,7 +1964,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
assert (!rqst.isSetTxnIds());
assert (!srcTxnToWriteIds.isEmpty());
- for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) {
+ for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) {
srcTxnIds.add(txnToWriteId.getTxnId());
}
txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn);
@@ -1992,21 +1982,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
//Easiest check since we can't differentiate do we handle singleton list or list with multiple txn ids.
- if(txnIds.size() > 1) {
+ if (txnIds.size() > 1) {
Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow
}
// Check if all the input txns are in valid state.
// Write IDs should be allocated only for open and not read-only transactions.
- if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
- String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
- + " failed for input txns: "
- + getAbortedAndReadOnlyTxns(txnIds, stmt)
- + getCommittedTxns(txnIds, stmt);
- LOG.error(errorMsg);
-
- throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
- + " as not all input txns in open state or read-only");
+ try (Statement stmt = dbConn.createStatement()) {
+ if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
+ String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
+ + " failed for input txns: "
+ + getAbortedAndReadOnlyTxns(txnIds, stmt)
+ + getCommittedTxns(txnIds, stmt);
+ LOG.error(errorMsg);
+
+ throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
+ + " as not all input txns in open state or read-only");
+ }
}
List<String> queries = new ArrayList<>();
@@ -2017,14 +2009,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// write id for the same db.table. If yes, then need to reuse it else have to allocate new one
// The write id would have been already allocated in case of multi-statement txns where
// first write on a table will allocate write id and rest of the writes should re-use it.
- prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
- + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND ");
+ prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
+ .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
txnIds, "\"T2W_TXNID\"", false, false);
long allocatedTxnsCount = 0;
- long txnId;
- long writeId = 0;
+ long writeId;
List<String> params = Arrays.asList(dbName, tblName);
for (String query : queries) {
pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
@@ -2033,7 +2024,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
rs = pStmt.executeQuery();
while (rs.next()) {
// If table write ID is already allocated for the given transaction, then just use it
- txnId = rs.getLong(1);
+ long txnId = rs.getLong(1);
writeId = rs.getLong(2);
txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
allocatedTxnsCount++;
@@ -2054,17 +2045,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
long srcWriteId = 0;
if (rqst.isSetReplPolicy()) {
// In replication flow, we always need to allocate write ID equal to that of source.
- assert(srcTxnToWriteIds != null);
+ assert (srcTxnToWriteIds != null);
srcWriteId = srcTxnToWriteIds.get(0).getWriteId();
}
- handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
// There are some txns in the list which does not have write id allocated and hence go ahead and do it.
// Get the next write id for the given table and update it with new next write id.
// This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
String s = sqlGenerator.addForUpdateClause(
- "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?");
+ "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?");
closeStmt(pStmt);
pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
@@ -2076,7 +2066,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// For repl flow, we need to force set the incoming write id.
writeId = (srcWriteId > 0) ? srcWriteId : 1;
s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, "
- + Long.toString(writeId + numOfWriteIds) + ")";
+ + (writeId + numOfWriteIds) + ")";
closeStmt(pStmt);
pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
@@ -2087,7 +2077,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
// Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
- s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds)
+ s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + (writeId + numOfWriteIds)
+ " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?";
closeStmt(pStmt);
pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
@@ -2109,24 +2099,29 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
- // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
- // write ids
- List<String> rows = new ArrayList<>();
- List<List<String>> paramsList = new ArrayList<>();
- for (long txn : txnIds) {
- rows.add(txn + ", ?, ?, " + writeId);
- txnToWriteIds.add(new TxnToWriteId(txn, writeId));
- paramsList.add(params);
- LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
- writeId++;
- }
+ // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated write ids
+ try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_TO_WRITE_ID_INSERT_QUERY)) {
+ for (long txnId : txnIds) {
+ pstmt.setLong(1, txnId);
+ pstmt.setString(2, dbName);
+ pstmt.setString(3, tblName);
+ pstmt.setLong(4, writeId);
+ pstmt.addBatch();
- // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
- insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows,
- paramsList);
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.execute();
+ txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+ LOG.info("Allocated writeId: " + writeId + " for txnId: " + txnId);
+ writeId++;
+ if (txnToWriteIds.size() % maxBatchSize == 0) {
+ LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+ "Batch size: " + maxBatchSize);
+ pstmt.executeBatch();
+ }
+ }
+ if (txnToWriteIds.size() % maxBatchSize != 0) {
+ LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+ "Batch size: " + txnToWriteIds.size() % maxBatchSize);
+ pstmt.executeBatch();
+ }
}
if (transactionalListeners != null) {
@@ -2136,27 +2131,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn, sqlGenerator);
}
- LOG.info("Allocated write ids for the table: " + dbName + "." + tblName);
- LOG.debug("Going to commit");
+ LOG.info("Allocated write ids for dbName={}, tblName={} (txnIds: {})", dbName, tblName, rqst.getTxnIds());
dbConn.commit();
return new AllocateTableWriteIdsResponse(txnToWriteIds);
} catch (SQLException e) {
- LOG.debug("Going to rollback");
+ LOG.error("Exception during write ids allocation for request={}. Will retry if possible.", rqst, e);
rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
+ checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")", true);
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
- if (insertPreparedStmts != null) {
- for (PreparedStatement pst : insertPreparedStmts) {
- closeStmt(pst);
- }
- }
- closeStmt(pStmt);
- close(rs, stmt, dbConn);
- if(handle != null) {
- handle.releaseLocks();
- }
+ close(rs, pStmt, dbConn);
unlockInternal();
}
} catch (RetryException e) {
@@ -2169,12 +2154,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
PreparedStatement pst = null;
- TxnStore.MutexAPI.LockHandle handle = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
//since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
//for this table. It also has a unique index in case 'should not' is violated
@@ -2197,9 +2180,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
+ StringUtils.stringifyException(e));
} finally {
close(null, pst, dbConn);
- if(handle != null) {
- handle.releaseLocks();
- }
unlockInternal();
}
} catch (RetryException e) {
@@ -4106,6 +4086,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
return false;
}
+
+ /**
+ * See {@link #checkRetryable(Connection, SQLException, String, boolean)}.
+ */
+ protected void checkRetryable(Connection conn, SQLException e, String caller) throws RetryException {
+ checkRetryable(conn, e, caller, false);
+ }
+
/**
* Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do
* this, so we have to inspect the error messages and catch the telltale signs for each
@@ -4114,11 +4102,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* @param conn database connection
* @param e exception that was thrown.
* @param caller name of the method calling this (and other info useful to log)
+ * @param retryOnDuplicateKey whether to retry on unique key constraint violation
* @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried
*/
- protected void checkRetryable(Connection conn,
- SQLException e,
- String caller) throws RetryException, MetaException {
+ protected void checkRetryable(Connection conn, SQLException e, String caller, boolean retryOnDuplicateKey)
+ throws RetryException {
// If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
// to test these changes.
@@ -4150,6 +4138,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
} else if (isRetryable(conf, e)) {
//in MSSQL this means Communication Link Failure
sendRetrySignal = waitForRetry(caller, e.getMessage());
+ } else if (retryOnDuplicateKey && isDuplicateKeyError(e)) {
+ sendRetrySignal = waitForRetry(caller, e.getMessage());
}
else {
//make sure we know we saw an error that we don't recognize
@@ -5210,7 +5200,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
break;
case SQLSERVER:
//2627 is unique constaint violation incl PK, 2601 - unique key
- if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+ if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) {
return true;
}
break;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 1e177f4..3e441b5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -45,8 +45,8 @@ public interface TxnStore extends Configurable {
String TXN_KEY_START = "_meta";
enum MUTEX_KEY {
- Initiator, Cleaner, HouseKeeper, CheckLock, TxnCleaner,
- CompactionScheduler, WriteIdAllocator, MaterializationRebuild
+ Initiator, Cleaner, HouseKeeper, TxnCleaner,
+ CompactionScheduler, MaterializationRebuild
}
// Compactor states (Should really be enum)
String INITIATED_RESPONSE = "initiated";