You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2021/12/08 09:00:08 UTC

[hive] branch master updated: HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e119ea  HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
0e119ea is described below

commit 0e119eaddb93dc10743bb8990ce8eca4fb77cf16
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Wed Dec 8 10:59:48 2021 +0200

    HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
    
    Closes #2825
---
 .../hive/ql/txn/compactor/TestCompactor.java       |  4 ++
 .../metastore/txn/TestCompactionTxnHandler.java    | 35 ++++++++---
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  2 +
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    |  1 +
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  |  1 +
 .../hive/metastore/txn/CompactionTxnHandler.java   | 69 ++++++++++++----------
 6 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7e48419..13705be 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1621,6 +1621,10 @@ public class TestCompactor {
     verifyFooBarResult(tblName, 2);
     verifyHasBase(table.getSd(), fs, "base_0000005_v0000016");
     runCleaner(conf);
+    // in case when we have # of accumulated entries for the same table/partition - we need to process them one-by-one in ASC order of write_id's,
+    // however, to support multi-threaded processing in the Cleaner, we have to move entries from the same group to the next Cleaner cycle, 
+    // so that they are not processed by multiple threads concurrently. 
+    runCleaner(conf);
     verifyDeltaCount(table.getSd(), fs, 0);
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ea1abc6..9bfc324 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -51,6 +51,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -196,8 +197,9 @@ public class TestCompactionTxnHandler {
     assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
     CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
     assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+    
+    ci.highestWriteId = 41;
+    txnHandler.updateCompactorState(ci, 0);
     txnHandler.markCompacted(ci);
     assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
 
@@ -225,8 +227,9 @@ public class TestCompactionTxnHandler {
     assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
     CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
     assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+    
+    ci.highestWriteId = 41;
+    txnHandler.updateCompactorState(ci, 0);
     txnHandler.markCompacted(ci);
     assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
 
@@ -721,8 +724,9 @@ public class TestCompactionTxnHandler {
   public void testMarkCleanedCleansTxnsAndTxnComponents()
       throws Exception {
     long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
+    long mytableWriteId = allocateTableWriteIds("mydb", "mytable", txnid);
+    
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
     comp.setTablename("mytable");
     comp.setOperationType(DataOperationType.INSERT);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
@@ -746,6 +750,8 @@ public class TestCompactionTxnHandler {
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
 
     txnid = openTxn();
+    long fooWriteId = allocateTableWriteIds("mydb", "foo", txnid);
+    
     comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
     comp.setTablename("foo");
     comp.setPartitionname("bar=compact");
@@ -769,7 +775,7 @@ public class TestCompactionTxnHandler {
     assertTrue(res.getState() == LockState.ACQUIRED);
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
 
-    CompactionInfo ci = new CompactionInfo();
+    CompactionInfo ci;
 
     // Now clean them and check that they are removed from the count.
     CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
@@ -777,8 +783,11 @@ public class TestCompactionTxnHandler {
     assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
     ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
     assertNotNull(ci);
+    
+    ci.highestWriteId = mytableWriteId;
+    txnHandler.updateCompactorState(ci, 0);
     txnHandler.markCompacted(ci);
-
+    
     Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
     List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
     assertEquals(1, toClean.size());
@@ -801,6 +810,9 @@ public class TestCompactionTxnHandler {
     assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
     ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
     assertNotNull(ci);
+
+    ci.highestWriteId = fooWriteId;
+    txnHandler.updateCompactorState(ci, 0);
     txnHandler.markCompacted(ci);
 
     toClean = txnHandler.findReadyToClean(0, 0);
@@ -944,5 +956,12 @@ public class TestCompactionTxnHandler {
     List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
     return txns.get(0);
   }
+  
+  private long allocateTableWriteIds (String dbName, String tblName, long txnid) throws Exception {
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tblName);
+    rqst.setTxnIds(Collections.singletonList(txnid));
+    AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
+    return writeIds.getTxnToWriteIds().get(0).getWriteId();
+  }
 
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e82abdd..4a8bbc0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -806,6 +806,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
     Assert.assertEquals(7, status.length);
     runCleaner(hiveConf);
+    runCleaner(hiveConf);
     // There should be only 1 directory left: base_0000001.
     // Original bucket files, delta directories and previous base directory should have been cleaned up.
     status = fs.listStatus(new Path(getWarehouseDir() + "/" +
@@ -2047,6 +2048,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
     // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir.
     runCleaner(hiveConf);
+    runCleaner(hiveConf);
     // There should be only 1 directory left: base_xxxxxxx.
     // The delta dirs should have been cleaned up.
     status = fs.listStatus(new Path(getWarehouseDir() + "/" +
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index ec9f5a0..8345832 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -389,6 +389,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     so cleaner removes all files shadowed by it (which is everything in this case)
     */
     runCleaner(hiveConf);
+    runCleaner(hiveConf);
 
     expectedList = new String[] {
         "/t/delta_0000001_0000003_v0000020"
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a1205f4..42c5a04 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -591,6 +591,7 @@ public class TestCleaner extends CompactorTest {
     // unblock the cleaner and run again
     txnHandler.commitTxn(new CommitTxnRequest(blockingTxn));
     startCleaner();
+    startCleaner();
 
     // make sure cleaner removed everything below base_24, and both compactions are successful
     paths = getDirectories(conf, t, p);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f9d5a7e..4a9a6ef 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -323,57 +323,62 @@ class CompactionTxnHandler extends TxnHandler {
   @Override
   @RetrySemantics.ReadOnly
   public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException {
-    Connection dbConn = null;
-    List<CompactionInfo> rc = new ArrayList<>();
-
-    Statement stmt = null;
-    ResultSet rs = null;
     try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
+      List<CompactionInfo> rc = new ArrayList<>();
+      
+      try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+           Statement stmt = dbConn.createStatement()) {
         /*
          * By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see
          * the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction.
          */
-        String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
-                + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '"
-                + READY_FOR_CLEANING + "'";
+        String whereClause = " WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'";
         if (minOpenTxnWaterMark > 0) {
-          s = s + " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";
+          whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";
         }
         if (retentionTime > 0) {
-          s = s + " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")";
+          whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")";
         }
-        s = s + " ORDER BY \"CQ_HIGHEST_WRITE_ID\", \"CQ_ID\"";
+        String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
+          "   \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" +
+          "  FROM \"COMPACTION_QUEUE\" \"cq1\" " +
+          "INNER JOIN (" +
+          "  SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
+          "  FROM \"COMPACTION_QUEUE\"" 
+          + whereClause + 
+          "  GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" " +
+          "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
+          "  AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
+          "  AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
+          "    OR \"cq1\".\"CQ_PARTITION\" IS NULL AND \"cq2\".\"CQ_PARTITION\" IS NULL)"
+          + whereClause + 
+          "  AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
+          "  ORDER BY \"CQ_ID\"";
         LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
 
-        while (rs.next()) {
-          CompactionInfo info = new CompactionInfo();
-          info.id = rs.getLong(1);
-          info.dbname = rs.getString(2);
-          info.tableName = rs.getString(3);
-          info.partName = rs.getString(4);
-          info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
-          info.runAs = rs.getString(6);
-          info.highestWriteId = rs.getLong(7);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found ready to clean: " + info.toString());
+        try (ResultSet rs = stmt.executeQuery(s)) {
+          while (rs.next()) {
+            CompactionInfo info = new CompactionInfo();
+            info.id = rs.getLong(1);
+            info.dbname = rs.getString(2);
+            info.tableName = rs.getString(3);
+            info.partName = rs.getString(4);
+            info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+            info.runAs = rs.getString(6);
+            info.highestWriteId = rs.getLong(7);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Found ready to clean: " + info.toString());
+            }
+            rc.add(info);
           }
-          rc.add(info);
         }
         return rc;
       } catch (SQLException e) {
         LOG.error("Unable to select next element for cleaning, " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
         checkRetryable(e, "findReadyToClean");
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
+      } 
     } catch (RetryException e) {
       return findReadyToClean(minOpenTxnWaterMark, retentionTime);
     }