You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/01/27 12:56:08 UTC

[GitHub] [hive] veghlaci05 opened a new pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

veghlaci05 opened a new pull request #2981:
URL: https://github.com/apache/hive/pull/2981


   ### What changes were proposed in this pull request?
   
   This PR fixes the issue when the Heartbeater of the CompactionTxn is not shut down. This could happen if an exception is thrown from CompactionTxn.abort() or CompactionTxn.commit() which is not caught by the catch block. The fix reworks heartbeat handling the following way:
   1. The heartbeater is shut down BEFORE calling abort() or commit(), so regardless of the outcome of the transaction, or if an exception is thrown or not, the heartbeater won't stay alive. Another benefit that, this approach solves the logging after commit()/abort() issue too, without the need of the previously introduced additional logic (See: HIVE-25740).
   
   ### Why are the changes needed?
   To fix the issue that in some cases the heartbeater of a CompactionTxn is not shut down properly.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   
   Through unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796496875



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +176,82 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    final IMetaStoreClient mockedClient = Mockito.spy(HiveMetaStoreUtils.getHiveMetastoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    class Capture<T> implements Answer {

Review comment:
       i think it's more readable:
   ````
       AtomicReference<Worker.CompactionTxn> capture = new AtomicReference<>();
       
       doAnswer(invocation -> {
         Worker.CompactionTxn compactionTxn = (Worker.CompactionTxn) invocation.callRealMethod();
         capture.set(compactionTxn);
         return compactionTxn;
       }).when(worker).newCompactionTxn();
   
       worker.run();
   
       //Check if the heartbeating is properly terminated
       Assert.assertTrue(capture.get().isHeartbeatTerminated());
   ````




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796521994



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +176,82 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    final IMetaStoreClient mockedClient = Mockito.spy(HiveMetaStoreUtils.getHiveMetastoreClient(conf));

Review comment:
       I can replace the factory call with the constructor, but without the mockmaker, the capture does not work at all. It seems capturing method results also require this feature to be enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r798540039



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);
           }
-          LOG.debug("Successfully stopped heartbeating for transaction {}", this);
         } catch (InterruptedException ex) {
-          heartbeatExecutor.shutdownNow();
+          //Caller thread was interrupted while waiting for heartbeater to terminate, nothing to do

Review comment:
       I don't think it is necessary, that log would be sth like "Compaction was aborted while waiting for the heartbeater to shut down" which doesn't add too much in my opinion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] zabetak commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
zabetak commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r797437477



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
##########
@@ -88,4 +90,13 @@ static String getCompactorJobQueueName(HiveConf conf, CompactionInfo ci, Table t
     }
     return null;
   }
+
+  public static ThreadFactory createThreadFactory(String threadNameFormat) {
+    return new ThreadFactoryBuilder()
+            .setPriority(Thread.currentThread().getPriority())
+            .setDaemon(Thread.currentThread().isDaemon())

Review comment:
       Do we want the heartbeater to run with same options as the current thread? From the code below it seems that before it was a daemon thread with min priority.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -207,26 +206,16 @@ static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, Strin
     }
   }
 
-  static final class CompactionHeartbeater extends Thread {
+  static final class CompactionHeartbeater implements Runnable {
     static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
     private final CompactionTxn compactionTxn;
     private final String tableName;
     private final HiveConf conf;
-    private final AtomicBoolean errorLogEnabled;
 
     public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
       this.tableName = Objects.requireNonNull(tableName);
       this.compactionTxn = Objects.requireNonNull(compactionTxn);
       this.conf = Objects.requireNonNull(conf);
-      this.errorLogEnabled = new AtomicBoolean(true);
-
-      setDaemon(true);
-      setPriority(MIN_PRIORITY);
-      setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
-    }
-
-    public void shouldLogError(boolean shouldLogError) {
-      this.errorLogEnabled.set(shouldLogError);

Review comment:
       Why it was necessary to have conditional error logging in HIVE-25740?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +174,73 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    AtomicReference<Worker.CompactionTxn> capture = new AtomicReference<>();
+    doAnswer(invocation -> {
+      Worker.CompactionTxn compactionTxn = (Worker.CompactionTxn) invocation.callRealMethod();
+      capture.set(compactionTxn);
+      return compactionTxn;
+    }).when(worker).newCompactionTxn();
+
+    worker.run();
+
+    //Check if the heartbeating is properly terminated
+    Assert.assertTrue(capture.get().isHeartbeatTerminated());

Review comment:
       Can't we simply exploit `Thread.getAllStackTraces()` to find if a thread is running or not. This would probably make `Mockito` redundant and it could possibly allow us to remove `isHeartbeatTerminated()`. Moreover, at the moment `isHeartbeatTerminated` will return true if the executor is `null` so I am not sure if the assertion is completely valid.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);

Review comment:
       This is a potential thread leak. Wouldn't be better to log as error or at least warning?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       I see places where we are explicitly setting msc to null. I don't know well this part of the code but if null checks are redundant wouldn't be better to treat it in a separate PR?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);
           }
-          LOG.debug("Successfully stopped heartbeating for transaction {}", this);
         } catch (InterruptedException ex) {
-          heartbeatExecutor.shutdownNow();
+          //Caller thread was interrupted while waiting for heartbeater to terminate, nothing to do

Review comment:
       Probably worth logging instead of completely ignoring the interruption.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ merged pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ merged pull request #2981:
URL: https://github.com/apache/hive/pull/2981


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796499328



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       shouldn't we restore null checks in this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r797369739



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -22,6 +22,7 @@
 import static org.apache.hadoop.hive.ql.TestTxnCommands2.runInitiator;
 import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;

Review comment:
       why wildcard import?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796522231



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -339,15 +326,19 @@ protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, bool
     CompactionInfo ci = null;
     boolean computeStats = false;
     Table t1 = null;
-    try (CompactionTxn compactionTxn = new CompactionTxn()) {
-      if (msc == null) {
-        try {
-          msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
-        } catch (Exception e) {
-          LOG.error("Failed to connect to HMS", e);
-          return false;
-        }
+
+    // If an exception is thrown in the try-with-resources block below, msc is closed and nulled, so a new instance
+    // is need to be obtained here.
+    if (msc == null) {
+      try {
+        msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+      } catch (Exception e) {
+        LOG.error("Failed to connect to HMS", e);
+        return false;

Review comment:
       shouldn't we throw an exception here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796522672



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -696,11 +682,15 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory("CompactionTxn Heartbeater %d"));

Review comment:
       No, that is a builtin feature, it will be a sequential number assigned by the thread factory




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796497897



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +176,82 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    final IMetaStoreClient mockedClient = Mockito.spy(HiveMetaStoreUtils.getHiveMetastoreClient(conf));

Review comment:
       use 
   ````
   final IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
   ````
   and remove MockMaker




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796522231



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -339,15 +326,19 @@ protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, bool
     CompactionInfo ci = null;
     boolean computeStats = false;
     Table t1 = null;
-    try (CompactionTxn compactionTxn = new CompactionTxn()) {
-      if (msc == null) {
-        try {
-          msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
-        } catch (Exception e) {
-          LOG.error("Failed to connect to HMS", e);
-          return false;
-        }
+
+    // If an exception is thrown in the try-with-resources block below, msc is closed and nulled, so a new instance
+    // is need to be obtained here.
+    if (msc == null) {
+      try {
+        msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+      } catch (Exception e) {
+        LOG.error("Failed to connect to HMS", e);
+        return false;

Review comment:
       shouldn't we throw an exception here? that would allow to simplify the code
   ````
   msc = Optional.ofNullable(msc).orElse(HiveMetaStoreUtils.getHiveMetastoreClient(conf));
   ````




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r800461666



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true));

Review comment:
       I would keep it as-is, with Denys we think the utility method can be used elsewhere in the future, and the thread naming will also be more explicit this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] zabetak commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
zabetak commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r799803119



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true));

Review comment:
       nit: I would keep the thread name unchanged just to avoid potentially breaking people scripts when analyzing stack traces etc.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true));

Review comment:
       nit: You could even avoid introducing a new utility method (which is used in only one place) and dependency to Guava via:
   ```
   heartbeatExecutor = Executors.newSingleThreadScheduledExecutor((runnable) -> {
           Thread t = new Thread(runnable);
           t.setDaemon(true);
           t.setName("CompactionHeartbeater-" + txnId);
           t.setPriority(Thread.MIN_PRIORITY);
           return t;
         });
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true));

Review comment:
       No strong feelings up to you :)

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +173,67 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    worker.run();
+
+    //Check if the heartbeating is properly terminated
+    Assert.assertTrue(Thread.getAllStackTraces().keySet()
+            .stream().noneMatch(k -> k.getName().contains("CompactionTxn Heartbeater")));

Review comment:
       Maybe an assertion above that it really runs at some point could be useful.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -657,24 +649,19 @@ private String getWorkerId() {
   /**
    * Keep track of the compaction's transaction and its operations.
    */
-  private class CompactionTxn implements AutoCloseable {
+  class CompactionTxn implements AutoCloseable {

Review comment:
       Do we need to change visibility?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       @veghlaci05 have you seen my comment above?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);
           }
-          LOG.debug("Successfully stopped heartbeating for transaction {}", this);
         } catch (InterruptedException ex) {
-          heartbeatExecutor.shutdownNow();
+          //Caller thread was interrupted while waiting for heartbeater to terminate, nothing to do

Review comment:
       If there is an interrupt during `heartbeatExecutor#awaitTermination` we will never see another log (nor stacktrace) that there was an attempt to shutdown the heartbeater. Moreover, I don't feel very comfortable about swallowing completely an interrupted exception. The least that I would expect here is:
   
   `Thread.currentThread().interrupt();`
   
   but if you have thoroughly though about it and it is not necessary then I trust your judgement.
   
   TLDR 
   Citing the "Java Concurrency in Practice":
   Propagate the InterruptedException . This is often the most sensible policy if you can get away with it just propagate the InterruptedException to your caller. This could involve not catching InterruptedException , or catching it and throwing it again after performing some brief activity specific cleanup.
   
   Restore the interrupt. Sometimes you cannot throw InterruptedException , for instance when your code is part of a Runnable . In these situations, you must catch InterruptedException and restore the interrupted status by calling interrupt on the current thread, so that code higher up the call stack can see that an interrupt was issued, as demonstrated in Listing 5.10.
   
   You can get much more sophisticated with interruption, but these two approaches should work in the vast majority of situations. But there is one thing you should not do with InterruptedException catch it and do nothing in response. This deprives code higher up on the call stack of the opportunity to act on the interruption, because the evidence that the thread was interrupted is lost. The only situation in which it is acceptable to swallow an interrupt is when you are extending Thread and therefore control all the code higher up on the call stack. Cancellation and interruption are covered in greater detail in Chapter 7.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on pull request #2981:
URL: https://github.com/apache/hive/pull/2981#issuecomment-1032448081


   @zabetak Do you have any other points regarding this PR? If not we would merge it back to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796518863



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -696,11 +682,15 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory("CompactionTxn Heartbeater %d"));

Review comment:
       what would be the id, would it be compactionTxn.getTxnId() ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796526484



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       Since the reinit of msc field moved out of the try with resources block, it is no longer necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796516351



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,29 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
-        heartbeatExecutor.shutdownNow();
         try {
+          heartbeatExecutor.shutdownNow();

Review comment:
       let's move it out of the try-catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r797458415



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -207,26 +206,16 @@ static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, Strin
     }
   }
 
-  static final class CompactionHeartbeater extends Thread {
+  static final class CompactionHeartbeater implements Runnable {
     static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
     private final CompactionTxn compactionTxn;
     private final String tableName;
     private final HiveConf conf;
-    private final AtomicBoolean errorLogEnabled;
 
     public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
       this.tableName = Objects.requireNonNull(tableName);
       this.compactionTxn = Objects.requireNonNull(compactionTxn);
       this.conf = Objects.requireNonNull(conf);
-      this.errorLogEnabled = new AtomicBoolean(true);
-
-      setDaemon(true);
-      setPriority(MIN_PRIORITY);
-      setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
-    }
-
-    public void shouldLogError(boolean shouldLogError) {
-      this.errorLogEnabled.set(shouldLogError);

Review comment:
       If you check the issue, a race condition could cause a last heartbeat after commit/abort. The added logic was fixed that. In my change I moved heartbeat interrupt before commit abort, making the entire logging logic unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] veghlaci05 commented on a change in pull request #2981: HIVE-25898: Compaction txn heartbeating after Worker timeout

Posted by GitBox <gi...@apache.org>.
veghlaci05 commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r800461666



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true));

Review comment:
       I would keep it as-is, with Denys we think the utility method can be used elsewhere in the future, and the thread naming will also be more explicit this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org