You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by as...@apache.org on 2022/01/19 15:21:30 UTC

[hive] branch master updated: HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 718e19e  HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)
718e19e is described below

commit 718e19ecabf09c4e11b1a4addd9ab78f12187961
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Wed Jan 19 16:21:10 2022 +0100

    HIVE-24805: Compactor: Initiator shouldn't fetch table details again and again for partitioned tables (Antal Sinkovits, reviewed by Denys Kuzmenko)
    
    Closes  #2906
---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 29 ++++++++++++--
 .../hive/ql/txn/compactor/TestInitiator.java       | 46 ++++++++++++++++++++++
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  3 ++
 3 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5b31d97..880e5e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,7 +70,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -90,6 +94,7 @@ public class Initiator extends MetaStoreCompactorThread {
 
   private long checkInterval;
   private ExecutorService compactionExecutor;
+  private Optional<Cache<String, Table>> tableCache = Optional.empty();
 
   @Override
   public void run() {
@@ -142,6 +147,9 @@ public class Initiator extends MetaStoreCompactorThread {
 
           final ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
 
+          // Currently we invalidate all entries after each cycle, because the bootstrap replication is marked via
+          // table property hive.repl.first.inc.pending which would be cached.
+          tableCache.ifPresent(c -> c.invalidateAll());
           Set<String> skipDBs = Sets.newConcurrentHashSet();
           Set<String> skipTables = Sets.newConcurrentHashSet();
 
@@ -164,7 +172,7 @@ public class Initiator extends MetaStoreCompactorThread {
 
           for (CompactionInfo ci : potentials) {
             try {
-              Table t = resolveTable(ci);
+              Table t = resolveTableAndCache(ci);
               Partition p = resolvePartition(ci);
               if (p == null && ci.partName != null) {
                 LOG.info("Can't find partition " + ci.getFullPartitionName() +
@@ -242,6 +250,17 @@ public class Initiator extends MetaStoreCompactorThread {
     }
   }
 
+  private Table resolveTableAndCache(CompactionInfo ci) throws MetaException {
+    if (tableCache.isPresent()) {
+      try {
+        return tableCache.get().get(ci.getFullTableName(), () -> resolveTable(ci));
+      } catch (ExecutionException e) {
+        throw (MetaException) e.getCause();
+      }
+    }
+    return resolveTable(ci);
+  }
+
   private ValidWriteIdList resolveValidWriteIds(Table t) throws NoSuchTxnException, MetaException {
     ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
     // The response will have one entry per table and hence we get only one ValidWriteIdList
@@ -275,6 +294,11 @@ public class Initiator extends MetaStoreCompactorThread {
     compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
             COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
+    boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
+        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
+    if (tableCacheOn) {
+      this.tableCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
+    }
   }
 
   private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
@@ -524,8 +548,7 @@ public class Initiator extends MetaStoreCompactorThread {
         return false;
       }
 
-      //TODO: avoid repeated HMS lookup for same table (e.g partitions within table)
-      Table t = resolveTable(ci);
+      Table t = resolveTableAndCache(ci);
       if (t == null) {
         LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
             "table or has been dropped and moving on.");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e3ff82e..a07427e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -58,6 +58,8 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import org.mockito.Mockito;
 
+import static org.mockito.Mockito.times;
+
 /**
  * Tests for the compactor Initiator thread.
  */
@@ -1069,6 +1071,50 @@ public class TestInitiator extends CompactorTest {
     Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
   }
 
+  @Test
+  public void testMetaCache() throws Exception {
+    String dbname = "default";
+    String tableName = "tmc";
+    Table t = newTable(dbname, tableName, true);
+    List<LockComponent> components = new ArrayList<>();
+
+    for (int i = 0; i < 2; i++) {
+      Partition p = newPartition(t, "part" + (i + 1));
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, dbname);
+      comp.setTablename(tableName);
+      comp.setPartitionname("ds=part" + (i + 1));
+      comp.setOperationType(DataOperationType.UPDATE);
+      components.add(comp);
+    }
+    burnThroughTransactions(dbname, tableName, 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId(dbname, tableName, txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 3);
+    Initiator initiator = Mockito.spy(new Initiator());
+    initiator.setThreadId((int) t.getId());
+    initiator.setConf(conf);
+    initiator.init(new AtomicBoolean(true));
+    initiator.run();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(2, compacts.size());
+    Mockito.verify(initiator, times(1)).resolveTable(Mockito.any());
+  }
+
   private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
     FindNextCompactRequest request = new FindNextCompactRequest();
     request.setWorkerId(workerId);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 2feff8c..6f682ba 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -572,6 +572,9 @@ public class MetastoreConf {
             "Set this to true on one instance of the Thrift metastore service as part of turning\n" +
             "on Hive transactions. For a complete list of parameters required for turning on\n" +
             "transactions, see hive.txn.manager."),
+    COMPACTOR_INITIATOR_TABLECACHE_ON("metastore.compactor.initiator.tablecache.on",
+      "hive.compactor.initiator.tablecache.on", true,
+      "Enable table caching in the initiator. Currently the cache is cleaned after each cycle."),
     COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads",
         "hive.compactor.worker.threads", 0,
         "How many compactor worker threads to run on this metastore instance. Set this to a\n" +