You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/05 09:53:00 UTC

[hive] branch master updated: HIVE-22420: DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b15c1b6  HIVE-22420: DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter Vary)
b15c1b6 is described below

commit b15c1b6530c0886f947acab28cd5af07a1f4376f
Author: Aron Hamvas <ha...@cloudera.com>
AuthorDate: Tue Nov 5 10:52:22 2019 +0100

    HIVE-22420: DbTxnManager.stopHeartbeat() should be thread-safe (Aron Hamvas via Peter Vary)
---
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       | 101 ++++++++++++++-------
 1 file changed, 67 insertions(+), 34 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 76934bc..2dbbde9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * An implementation of HiveTxnManager that stores the transactions in the metastore database.
@@ -161,6 +162,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   private ScheduledFuture<?> heartbeatTask = null;
   private Runnable shutdownRunner = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
+  private final ReentrantLock heartbeatTaskLock = new ReentrantLock();
 
   /**
    * We do this on every call to make sure TM uses same MS connection as is used by the caller (Driver,
@@ -276,9 +278,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
-        txnId = 0;
-        stmtId = -1;
-        tableWriteIds.clear();
+        resetTxnInfo();
       }
       throw e;
     }
@@ -535,11 +535,18 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     if (!isTxnOpen()) {
       throw new RuntimeException("Attempt to rollback before opening a transaction");
     }
+    stopHeartbeat();
+
     try {
       lockMgr.clearLocalLockRecords();
-      stopHeartbeat();
       LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
-      getMS().rollbackTxn(txnId);
+
+      // Re-checking as txn could have been closed, in the meantime, by a competing thread.
+      if (isTxnOpen()) {
+        getMS().rollbackTxn(txnId);
+      } else {
+        LOG.warn("Transaction is already closed.");
+      }
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -549,10 +556,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
     } finally {
-      txnId = 0;
-      stmtId = -1;
-      numStatements = 0;
-      tableWriteIds.clear();
+      resetTxnInfo();
     }
   }
 
@@ -635,11 +639,21 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException("error while getting current user,", e);
     }
 
-    Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser);
-    heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
-    LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval +
-        " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
-    return heartbeater;
+    try {
+      heartbeatTaskLock.lock();
+      if (heartbeatTask != null) {
+        throw new IllegalStateException("Heartbeater is already started.");
+      }
+
+      Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser);
+      heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
+      LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval +
+          " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
+
+      return heartbeater;
+    } finally {
+      heartbeatTaskLock.unlock();
+    }
   }
 
   private ScheduledFuture<?> startHeartbeat(long initialDelay, long heartbeatInterval, Runnable heartbeater) {
@@ -657,30 +671,49 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     return task;
   }
 
-  private void stopHeartbeat() throws LockException {
-    if (heartbeatTask != null) {
-      heartbeatTask.cancel(true);
-      long startTime = System.currentTimeMillis();
-      long sleepInterval = 100;
-      while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
-        // We will wait for 30 seconds for the task to be cancelled.
-        // If it's still not cancelled (unlikely), we will just move on.
-        long now = System.currentTimeMillis();
-        if (now - startTime > 30000) {
-          LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
-          break;
+  private void stopHeartbeat() {
+    if (heartbeatTask == null) {
+      // avoid unnecessary locking if the field is null
+      return;
+    }
+
+    boolean isLockAcquired = false;
+    try {
+      // The lock should not be held by other thread trying to stop the heartbeat for more than 31 seconds
+      isLockAcquired = heartbeatTaskLock.tryLock(31000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      // safe to go on
+    }
+
+    try {
+      if (isLockAcquired && heartbeatTask != null) {
+        heartbeatTask.cancel(true);
+        long startTime = System.currentTimeMillis();
+        long sleepInterval = 100;
+        while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+          // We will wait for 30 seconds for the task to be cancelled.
+          // If it's still not cancelled (unlikely), we will just move on.
+          long now = System.currentTimeMillis();
+          if (now - startTime > 30000) {
+            LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
+            break;
+          }
+          try {
+            Thread.sleep(sleepInterval);
+          } catch (InterruptedException e) {
+          }
+          sleepInterval *= 2;
         }
-        try {
-          Thread.sleep(sleepInterval);
-        } catch (InterruptedException e) {
+        if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+          LOG.info("Stopped heartbeat for query: " + queryId);
         }
-        sleepInterval *= 2;
+        heartbeatTask = null;
+        queryId = null;
       }
-      if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
-        LOG.info("Stopped heartbeat for query: " + queryId);
+    } finally {
+      if (isLockAcquired) {
+        heartbeatTaskLock.unlock();
       }
-      heartbeatTask = null;
-      queryId = null;
     }
   }