You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by as...@apache.org on 2022/01/19 15:21:30 UTC
[hive] branch master updated: HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 718e19e HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)
718e19e is described below
commit 718e19ecabf09c4e11b1a4addd9ab78f12187961
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Wed Jan 19 16:21:10 2022 +0100
HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)
Closes #2906
---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 29 ++++++++++++--
.../hive/ql/txn/compactor/TestInitiator.java | 46 ++++++++++++++++++++++
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 ++
3 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5b31d97..880e5e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -68,7 +70,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -90,6 +94,7 @@ public class Initiator extends MetaStoreCompactorThread {
private long checkInterval;
private ExecutorService compactionExecutor;
+ private Optional<Cache<String, Table>> tableCache = Optional.empty();
@Override
public void run() {
@@ -142,6 +147,9 @@ public class Initiator extends MetaStoreCompactorThread {
final ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
+ // Currently we invalidate all entries after each cycle, because the bootstrap replication is marked via
+ // table property hive.repl.first.inc.pending which would be cached.
+ tableCache.ifPresent(c -> c.invalidateAll());
Set<String> skipDBs = Sets.newConcurrentHashSet();
Set<String> skipTables = Sets.newConcurrentHashSet();
@@ -164,7 +172,7 @@ public class Initiator extends MetaStoreCompactorThread {
for (CompactionInfo ci : potentials) {
try {
- Table t = resolveTable(ci);
+ Table t = resolveTableAndCache(ci);
Partition p = resolvePartition(ci);
if (p == null && ci.partName != null) {
LOG.info("Can't find partition " + ci.getFullPartitionName() +
@@ -242,6 +250,17 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
+ private Table resolveTableAndCache(CompactionInfo ci) throws MetaException {
+ if (tableCache.isPresent()) {
+ try {
+ return tableCache.get().get(ci.getFullTableName(), () -> resolveTable(ci));
+ } catch (ExecutionException e) {
+ throw (MetaException) e.getCause();
+ }
+ }
+ return resolveTable(ci);
+ }
+
private ValidWriteIdList resolveValidWriteIds(Table t) throws NoSuchTxnException, MetaException {
ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
// The response will have one entry per table and hence we get only one ValidWriteIdList
@@ -275,6 +294,11 @@ public class Initiator extends MetaStoreCompactorThread {
compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
+ boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
+ if (tableCacheOn) {
+ this.tableCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
+ }
}
private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
@@ -524,8 +548,7 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- //TODO: avoid repeated HMS lookup for same table (e.g partitions within table)
- Table t = resolveTable(ci);
+ Table t = resolveTableAndCache(ci);
if (t == null) {
LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
"table or has been dropped and moving on.");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e3ff82e..a07427e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -58,6 +58,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+
/**
* Tests for the compactor Initiator thread.
*/
@@ -1069,6 +1071,50 @@ public class TestInitiator extends CompactorTest {
Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
}
+ @Test
+ public void testMetaCache() throws Exception {
+ String dbname = "default";
+ String tableName = "tmc";
+ Table t = newTable(dbname, tableName, true);
+ List<LockComponent> components = new ArrayList<>();
+
+ for (int i = 0; i < 2; i++) {
+ Partition p = newPartition(t, "part" + (i + 1));
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, dbname);
+ comp.setTablename(tableName);
+ comp.setPartitionname("ds=part" + (i + 1));
+ comp.setOperationType(DataOperationType.UPDATE);
+ components.add(comp);
+ }
+ burnThroughTransactions(dbname, tableName, 23);
+ long txnid = openTxn();
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+ long writeid = allocateWriteId(dbname, tableName, txnid);
+ Assert.assertEquals(24, writeid);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 3);
+ Initiator initiator = Mockito.spy(new Initiator());
+ initiator.setThreadId((int) t.getId());
+ initiator.setConf(conf);
+ initiator.init(new AtomicBoolean(true));
+ initiator.run();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(2, compacts.size());
+ Mockito.verify(initiator, times(1)).resolveTable(Mockito.any());
+ }
+
private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
FindNextCompactRequest request = new FindNextCompactRequest();
request.setWorkerId(workerId);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 2feff8c..6f682ba 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -572,6 +572,9 @@ public class MetastoreConf {
"Set this to true on one instance of the Thrift metastore service as part of turning\n" +
"on Hive transactions. For a complete list of parameters required for turning on\n" +
"transactions, see hive.txn.manager."),
+ COMPACTOR_INITIATOR_TABLECACHE_ON("metastore.compactor.initiator.tablecache.on",
+ "hive.compactor.initiator.tablecache.on", true,
+ "Enable table caching in the initiator. Currently the cache is cleaned after each cycle."),
COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads",
"hive.compactor.worker.threads", 0,
"How many compactor worker threads to run on this metastore instance. Set this to a\n" +