You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2021/12/08 09:00:08 UTC
[hive] branch master updated: HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0e119ea HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
0e119ea is described below
commit 0e119eaddb93dc10743bb8990ce8eca4fb77cf16
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Wed Dec 8 10:59:48 2021 +0200
HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
Closes #2825
---
.../hive/ql/txn/compactor/TestCompactor.java | 4 ++
.../metastore/txn/TestCompactionTxnHandler.java | 35 ++++++++---
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 1 +
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 1 +
.../hive/metastore/txn/CompactionTxnHandler.java | 69 ++++++++++++----------
6 files changed, 72 insertions(+), 40 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7e48419..13705be 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1621,6 +1621,10 @@ public class TestCompactor {
verifyFooBarResult(tblName, 2);
verifyHasBase(table.getSd(), fs, "base_0000005_v0000016");
runCleaner(conf);
+ // in case when we have # of accumulated entries for the same table/partition - we need to process them one-by-one in ASC order of write_id's,
+ // however, to support multi-threaded processing in the Cleaner, we have to move entries from the same group to the next Cleaner cycle,
+ // so that they are not processed by multiple threads concurrently.
+ runCleaner(conf);
verifyDeltaCount(table.getSd(), fs, 0);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ea1abc6..9bfc324 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -51,6 +51,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -196,8 +197,9 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+
+ ci.highestWriteId = 41;
+ txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
@@ -225,8 +227,9 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+
+ ci.highestWriteId = 41;
+ txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
@@ -721,8 +724,9 @@ public class TestCompactionTxnHandler {
public void testMarkCleanedCleansTxnsAndTxnComponents()
throws Exception {
long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
+ long mytableWriteId = allocateTableWriteIds("mydb", "mytable", txnid);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
comp.setTablename("mytable");
comp.setOperationType(DataOperationType.INSERT);
List<LockComponent> components = new ArrayList<LockComponent>(1);
@@ -746,6 +750,8 @@ public class TestCompactionTxnHandler {
txnHandler.abortTxn(new AbortTxnRequest(txnid));
txnid = openTxn();
+ long fooWriteId = allocateTableWriteIds("mydb", "foo", txnid);
+
comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
comp.setTablename("foo");
comp.setPartitionname("bar=compact");
@@ -769,7 +775,7 @@ public class TestCompactionTxnHandler {
assertTrue(res.getState() == LockState.ACQUIRED);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
- CompactionInfo ci = new CompactionInfo();
+ CompactionInfo ci;
// Now clean them and check that they are removed from the count.
CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
@@ -777,8 +783,11 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
+
+ ci.highestWriteId = mytableWriteId;
+ txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
-
+
Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
@@ -801,6 +810,9 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
+
+ ci.highestWriteId = fooWriteId;
+ txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
toClean = txnHandler.findReadyToClean(0, 0);
@@ -944,5 +956,12 @@ public class TestCompactionTxnHandler {
List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
return txns.get(0);
}
+
+ private long allocateTableWriteIds (String dbName, String tblName, long txnid) throws Exception {
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tblName);
+ rqst.setTxnIds(Collections.singletonList(txnid));
+ AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
+ return writeIds.getTxnToWriteIds().get(0).getWriteId();
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e82abdd..4a8bbc0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -806,6 +806,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
// 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
Assert.assertEquals(7, status.length);
runCleaner(hiveConf);
+ runCleaner(hiveConf);
// There should be only 1 directory left: base_0000001.
// Original bucket files, delta directories and previous base directory should have been cleaned up.
status = fs.listStatus(new Path(getWarehouseDir() + "/" +
@@ -2047,6 +2048,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
// 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir.
runCleaner(hiveConf);
+ runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// The delta dirs should have been cleaned up.
status = fs.listStatus(new Path(getWarehouseDir() + "/" +
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index ec9f5a0..8345832 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -389,6 +389,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
so cleaner removes all files shadowed by it (which is everything in this case)
*/
runCleaner(hiveConf);
+ runCleaner(hiveConf);
expectedList = new String[] {
"/t/delta_0000001_0000003_v0000020"
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a1205f4..42c5a04 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -591,6 +591,7 @@ public class TestCleaner extends CompactorTest {
// unblock the cleaner and run again
txnHandler.commitTxn(new CommitTxnRequest(blockingTxn));
startCleaner();
+ startCleaner();
// make sure cleaner removed everything below base_24, and both compactions are successful
paths = getDirectories(conf, t, p);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f9d5a7e..4a9a6ef 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -323,57 +323,62 @@ class CompactionTxnHandler extends TxnHandler {
@Override
@RetrySemantics.ReadOnly
public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException {
- Connection dbConn = null;
- List<CompactionInfo> rc = new ArrayList<>();
-
- Statement stmt = null;
- ResultSet rs = null;
try {
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
+ List<CompactionInfo> rc = new ArrayList<>();
+
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Statement stmt = dbConn.createStatement()) {
/*
* By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see
* the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction.
*/
- String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
- + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '"
- + READY_FOR_CLEANING + "'";
+ String whereClause = " WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'";
if (minOpenTxnWaterMark > 0) {
- s = s + " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";
+ whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";
}
if (retentionTime > 0) {
- s = s + " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")";
+ whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")";
}
- s = s + " ORDER BY \"CQ_HIGHEST_WRITE_ID\", \"CQ_ID\"";
+ String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
+ " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" +
+ " FROM \"COMPACTION_QUEUE\" \"cq1\" " +
+ "INNER JOIN (" +
+ " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
+ " FROM \"COMPACTION_QUEUE\""
+ + whereClause +
+ " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" " +
+ "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
+ " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
+ " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
+ " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND \"cq2\".\"CQ_PARTITION\" IS NULL)"
+ + whereClause +
+ " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
+ " ORDER BY \"CQ_ID\"";
LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.id = rs.getLong(1);
- info.dbname = rs.getString(2);
- info.tableName = rs.getString(3);
- info.partName = rs.getString(4);
- info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
- info.runAs = rs.getString(6);
- info.highestWriteId = rs.getLong(7);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found ready to clean: " + info.toString());
+ try (ResultSet rs = stmt.executeQuery(s)) {
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+ info.runAs = rs.getString(6);
+ info.highestWriteId = rs.getLong(7);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found ready to clean: " + info.toString());
+ }
+ rc.add(info);
}
- rc.add(info);
}
return rc;
} catch (SQLException e) {
LOG.error("Unable to select next element for cleaning, " + e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
checkRetryable(e, "findReadyToClean");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
- }
+ }
} catch (RetryException e) {
return findReadyToClean(minOpenTxnWaterMark, retentionTime);
}