You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/03/31 21:39:29 UTC

[impala] 01/02: IMPALA-11181: Improving performance of compaction checking

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 38efbee29d1d225d0a68eb5a6216325e12e81acd
Author: Yu-Wen Lai <yu...@cloudera.com>
AuthorDate: Tue Mar 15 20:12:52 2022 -0700

    IMPALA-11181: Improving performance of compaction checking
    
    After HIVE-25753, we don't need to explicitly set all partitions' name
    to get the latest compaction id. Besides, we can also send the last
    compaction id over to HMS so that HMS will send back compaction info
    only if there are newer compactions. In this way, we can avoid
    unnecessary data transmitted between HMS and Catalogd.
    
    Testing:
    existing tests
    
    Change-Id: I32e30ec418ad09bef862e61163539a910c96c44c
    Reviewed-on: http://gerrit.cloudera.org:8080/18324
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 39 +++++++++++-----------
 .../java/org/apache/impala/catalog/HdfsTable.java  | 12 +++++++
 .../impala/catalog/local/DirectMetaProvider.java   |  5 ---
 3 files changed, 31 insertions(+), 25 deletions(-)

diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 0829106..37f35ab 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -108,6 +108,7 @@ import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Metrics;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
@@ -609,19 +610,12 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
   public static List<HdfsPartition.Builder> getPartitionsForRefreshingFileMetadata(
       CatalogServiceCatalog catalog, HdfsTable hdfsTable) throws CatalogException {
     List<HdfsPartition.Builder> partBuilders = new ArrayList<>();
-    List<HdfsPartition> hdfsPartitions = hdfsTable.getPartitions()
-        .stream()
-        .map(p -> (HdfsPartition) p)
-        .collect(Collectors.toList());
     // fetch the latest compaction info from HMS
     GetLatestCommittedCompactionInfoRequest request =
         new GetLatestCommittedCompactionInfoRequest(
             hdfsTable.getDb().getName(), hdfsTable.getName());
-    if (hdfsTable.isPartitioned()) {
-      List<String> partNames = hdfsPartitions.stream()
-          .map(HdfsPartition::getPartitionName)
-          .collect(Collectors.toList());
-      request.setPartitionnames(partNames);
+    if (hdfsTable.getLastCompactionId() > 0) {
+      request.setLastCompactionId(hdfsTable.getLastCompactionId());
     }
 
     GetLatestCommittedCompactionInfoResponse response;
@@ -645,12 +639,9 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       }
     }
 
-    for (HdfsPartition partition : hdfsPartitions) {
-      long latestCompactionId =
-          partNameToCompactionId.getOrDefault(partition.getPartitionName(), -1L);
-      if (partition.getLastCompactionId() >= latestCompactionId) {
-        continue;
-      }
+    for (HdfsPartition partition : hdfsTable.getPartitionsForNames(
+        partNameToCompactionId.keySet())) {
+      long latestCompactionId = partNameToCompactionId.get(partition.getPartitionName());
       HdfsPartition.Builder builder = new HdfsPartition.Builder(partition);
       LOG.debug(
           "Cached compaction id for {} partition {}: {} but the latest compaction id: {}",
@@ -672,6 +663,10 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       throws TException {
     Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
     Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    if (metas.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Stopwatch sw = Stopwatch.createStarted();
     List<PartitionRef> stalePartitions = new ArrayList<>();
     if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
     GetLatestCommittedCompactionInfoRequest request =
@@ -680,13 +675,17 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       request.setPartitionnames(metas.keySet().stream()
           .map(PartitionRef::getName).collect(Collectors.toList()));
     }
+    long lastCompactionId = metas.values().stream()
+        .mapToLong(p -> p.getLastCompactionId()).max().orElse(-1);
+    if (lastCompactionId > 0) {
+      request.setLastCompactionId(lastCompactionId);
+    }
+
     GetLatestCommittedCompactionInfoResponse response;
     try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
       response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
     }
     Map<String, Long> partNameToCompactionId = new HashMap<>();
-    // If the table is partitioned, we must set partition name, otherwise empty result
-    // will be returned.
     if (table.isPartitioned()) {
       for (CompactionInfoStruct ci : response.getCompactions()) {
         partNameToCompactionId.put(
@@ -703,13 +702,13 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         metas.entrySet().iterator();
     while (iter.hasNext()) {
       Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
-      long latestCompactionId = partNameToCompactionId.getOrDefault(
-          entry.getKey().getName(), -1L);
-      if (entry.getValue().getLastCompactionId() < latestCompactionId) {
+      if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
         stalePartitions.add(entry.getKey());
         iter.remove();
       }
     }
+    LOG.debug("Checked the latest compaction info for {}.{}. Time taken: {}", dbName,
+        tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
     return stalePartitions;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 95917a1..5867b32 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -292,6 +292,13 @@ public class HdfsTable extends Table implements FeFsTable {
   // null in the case that this table is not transactional.
   protected MutableValidWriteIdList validWriteIds_ = null;
 
+  // The last committed compaction id in the table level. It will be sent as a filter to
+  // retrieve only the latest compaction that is not seen by this instance. This value is
+  // updated whenever a partition is added to the table so that it is guaranteed to be
+  // up-to-date.
+  // -1 means there is no previous compaction event or compaction is not supported.
+  private long lastCompactionId_ = -1;
+
   // Partitions are marked as "dirty" indicating there are in-progress modifications on
   // their metadata. The corresponding partition builder contains the new version of the
   // metadata so represents the in-progress modifications. The modifications will be
@@ -967,6 +974,7 @@ public class HdfsTable extends Table implements FeFsTable {
     fileMetadataStats_.totalFileBytes += partition.getSize();
     fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
     updatePartitionMdAndColStats(partition);
+    lastCompactionId_ = Math.max(lastCompactionId_, partition.getLastCompactionId());
     return true;
   }
 
@@ -3015,6 +3023,10 @@ public class HdfsTable extends Table implements FeFsTable {
     }
   }
 
+  public long getLastCompactionId() {
+    return lastCompactionId_;
+  }
+
   /**
    * Updates the pending version of this table if the tbl version matches with the
    * expectedTblVersion.
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 41c7d15..8b741fc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.Path;
@@ -55,7 +54,6 @@ import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
@@ -535,13 +533,10 @@ class DirectMetaProvider implements MetaProvider {
       TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
     Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
     Preconditions.checkNotNull(metas, "Partition map must be non-null");
-    Stopwatch sw = Stopwatch.createStarted();
 
     List<PartitionRef> stalePartitions = MetastoreShim.checkLatestCompaction(
         msClientPool_, dbName, tableName, table, metas,
         PartitionRefImpl.UNPARTITIONED_NAME);
-    LOG.debug("Checked the latest compaction id for {}.{} Time taken: {}", dbName,
-        tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
     return stalePartitions;
   }