You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/05/13 16:22:04 UTC

[GitHub] [hive] belugabehr commented on a change in pull request #2270: HIVE-25112: Simplify TXN Compactor Heartbeat Thread

belugabehr commented on a change in pull request #2270:
URL: https://github.com/apache/hive/pull/2270#discussion_r631935634



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -250,49 +251,48 @@ void gatherStats() {
 
   static final class CompactionHeartbeater extends Thread {
     static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
-    private final AtomicBoolean stop = new AtomicBoolean();
     private final CompactionTxn compactionTxn;
     private final String tableName;
     private final HiveConf conf;
-    private final long interval;
+    private final long txnTimeout;
+
     public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
-      this.tableName = tableName;
-      this.compactionTxn = compactionTxn;
-      this.conf = conf;
+      this.tableName = Objects.requireNonNull(tableName);
+      this.compactionTxn = Objects.requireNonNull(compactionTxn);
+      this.conf = Objects.requireNonNull(conf);
+
+      this.txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
 
-      this.interval =
-          MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
       setDaemon(true);
       setPriority(MIN_PRIORITY);
       setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
     }
+
     @Override
     public void run() {
+      LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactionTxn, tableName);
+
       IMetaStoreClient msc = null;
       try {
-        // We need to create our own metastore client since the thrifts clients
-        // are not thread safe.
+        // Create a metastore client for each thread since it is not thread safe
         msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
-        LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactionTxn, tableName);
-        while(!stop.get()) {
+        while (true) {
           msc.heartbeat(compactionTxn.getTxnId(), 0);
-          Thread.sleep(interval);
+
+          // Send a heart beat before a timeout occurs. Scale the interval based
+          // on the server's transaction timeout allowance
+          Thread.sleep(txnTimeout / 2);
         }
+      } catch (InterruptedException ie) {
+        LOG.debug("Successfully stop the heartbeating the transaction {}", this.compactionTxn);
       } catch (Exception e) {
-        LOG.error("Error while heartbeating txn {} in {}, error: ", compactionTxn, Thread.currentThread().getName(), e.getMessage());

Review comment:
       The thread name will be captured by the logging framework, and the message too.  Just need to pass in the entire Exception object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org