You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/05 09:53:00 UTC
[hive] branch master updated: HIVE-22420:
DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter
Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b15c1b6 HIVE-22420: DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter Vary)
b15c1b6 is described below
commit b15c1b6530c0886f947acab28cd5af07a1f4376f
Author: Aron Hamvas <ha...@cloudera.com>
AuthorDate: Tue Nov 5 10:52:22 2019 +0100
HIVE-22420: DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter Vary)
---
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 101 ++++++++++++++-------
1 file changed, 67 insertions(+), 34 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 76934bc..2dbbde9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
/**
* An implementation of HiveTxnManager that stores the transactions in the metastore database.
@@ -161,6 +162,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
private ScheduledFuture<?> heartbeatTask = null;
private Runnable shutdownRunner = null;
private static final int SHUTDOWN_HOOK_PRIORITY = 0;
+ private final ReentrantLock heartbeatTaskLock = new ReentrantLock();
/**
* We do this on every call to make sure TM uses same MS connection as is used by the caller (Driver,
@@ -276,9 +278,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
catch(LockException e) {
if(e.getCause() instanceof TxnAbortedException) {
- txnId = 0;
- stmtId = -1;
- tableWriteIds.clear();
+ resetTxnInfo();
}
throw e;
}
@@ -535,11 +535,18 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
if (!isTxnOpen()) {
throw new RuntimeException("Attempt to rollback before opening a transaction");
}
+ stopHeartbeat();
+
try {
lockMgr.clearLocalLockRecords();
- stopHeartbeat();
LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
- getMS().rollbackTxn(txnId);
+
+ // Re-checking as txn could have been closed, in the meantime, by a competing thread.
+ if (isTxnOpen()) {
+ getMS().rollbackTxn(txnId);
+ } else {
+ LOG.warn("Transaction is already closed.");
+ }
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -549,10 +556,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
} finally {
- txnId = 0;
- stmtId = -1;
- numStatements = 0;
- tableWriteIds.clear();
+ resetTxnInfo();
}
}
@@ -635,11 +639,21 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException("error while getting current user,", e);
}
- Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser);
- heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
- LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval +
- " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
- return heartbeater;
+ try {
+ heartbeatTaskLock.lock();
+ if (heartbeatTask != null) {
+ throw new IllegalStateException("Heartbeater is already started.");
+ }
+
+ Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser);
+ heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
+ LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval +
+ " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
+
+ return heartbeater;
+ } finally {
+ heartbeatTaskLock.unlock();
+ }
}
private ScheduledFuture<?> startHeartbeat(long initialDelay, long heartbeatInterval, Runnable heartbeater) {
@@ -657,30 +671,49 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
return task;
}
- private void stopHeartbeat() throws LockException {
- if (heartbeatTask != null) {
- heartbeatTask.cancel(true);
- long startTime = System.currentTimeMillis();
- long sleepInterval = 100;
- while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
- // We will wait for 30 seconds for the task to be cancelled.
- // If it's still not cancelled (unlikely), we will just move on.
- long now = System.currentTimeMillis();
- if (now - startTime > 30000) {
- LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
- break;
+ private void stopHeartbeat() {
+ if (heartbeatTask == null) {
+ // avoid unnecessary locking if the field is null
+ return;
+ }
+
+ boolean isLockAcquired = false;
+ try {
+ // The lock should not be held by other thread trying to stop the heartbeat for more than 31 seconds
+ isLockAcquired = heartbeatTaskLock.tryLock(31000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // safe to go on
+ }
+
+ try {
+ if (isLockAcquired && heartbeatTask != null) {
+ heartbeatTask.cancel(true);
+ long startTime = System.currentTimeMillis();
+ long sleepInterval = 100;
+ while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+ // We will wait for 30 seconds for the task to be cancelled.
+ // If it's still not cancelled (unlikely), we will just move on.
+ long now = System.currentTimeMillis();
+ if (now - startTime > 30000) {
+ LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
+ break;
+ }
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
+ }
+ sleepInterval *= 2;
}
- try {
- Thread.sleep(sleepInterval);
- } catch (InterruptedException e) {
+ if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+ LOG.info("Stopped heartbeat for query: " + queryId);
}
- sleepInterval *= 2;
+ heartbeatTask = null;
+ queryId = null;
}
- if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
- LOG.info("Stopped heartbeat for query: " + queryId);
+ } finally {
+ if (isLockAcquired) {
+ heartbeatTaskLock.unlock();
}
- heartbeatTask = null;
- queryId = null;
}
}