You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/02/01 11:22:00 UTC

[jira] [Work logged] (HIVE-25898) Compaction txn heartbeating after Worker timeout

     [ https://issues.apache.org/jira/browse/HIVE-25898?focusedWorklogId=718575&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-718575 ]

ASF GitHub Bot logged work on HIVE-25898:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Feb/22 11:21
            Start Date: 01/Feb/22 11:21
    Worklog Time Spent: 10m 
      Work Description: deniskuzZ commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r796496875



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +176,82 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    final IMetaStoreClient mockedClient = Mockito.spy(HiveMetaStoreUtils.getHiveMetastoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    class Capture<T> implements Answer {

Review comment:
       i think it's more readable:
   ````
       AtomicReference<Worker.CompactionTxn> capture = new AtomicReference<>();
       
       doAnswer(invocation -> {
         Worker.CompactionTxn compactionTxn = (Worker.CompactionTxn) invocation.callRealMethod();
         capture.set(compactionTxn);
         return compactionTxn;
       }).when(worker).newCompactionTxn();
   
       worker.run();
   
       //Check if the heartbeating is properly terminated
       Assert.assertTrue(capture.get().isHeartbeatTerminated());
   ````




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 718575)
    Time Spent: 20m  (was: 10m)

> Compaction txn heartbeating after Worker timeout
> ------------------------------------------------
>
>                 Key: HIVE-25898
>                 URL: https://issues.apache.org/jira/browse/HIVE-25898
>             Project: Hive
>          Issue Type: Bug
>          Components: Hive
>            Reporter: László Végh
>            Assignee: László Végh
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In some cases, when the compaction transaction is aborted, the hearbeater thread is not shut down and keeps heartbeating.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)