You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/12/03 02:21:16 UTC

[impala] 01/03: IMPALA-11032: Automatic Refresh of Metadata for Local Catalog after Compaction

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4077bc849ae14bb92a463aeeb6c8f5c1fca658c9
Author: Yu-Wen Lai <yu...@cloudera.com>
AuthorDate: Fri Nov 19 10:20:08 2021 -0800

    IMPALA-11032: Automatic Refresh of Metadata for Local Catalog after
    Compaction
    
    After compaction happened in Hive(HIVE ACID table), queries made in
    Impala possibly fail with a FileNotFoundException if files already
    removed by the Hive cleaner.
    
    In IMPALA-10801, catalogd checks the latest compaction id before serving
    metadata. However, coordinators don't take advantage of that.
    Coordinators have their own local cache, so we will have to do the
    same check for coordinators as well. Besides, we also need to attach
    writeIdList to requests that need to fetch file metadata. Since this
    checking brings additional overhead for queries, we introduce a flag
    auto_check_compaction and set it as false by default for now. We will
    find some other efficient ways to do compaction checking in the future.
    
    Tests:
    Added unit tests to CatalogdMetaProviderTest
    
    Change-Id: I173ea848917b6a41139b25b80677111463bfdc4b
    Reviewed-on: http://gerrit.cloudera.org:8080/18043
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-server.cc                    |   6 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 common/thrift/CatalogService.thrift                |   3 +
 .../impala/catalog/CompactionInfoLoader.java       |  18 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   1 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  41 ++++-
 .../impala/catalog/local/DirectMetaProvider.java   |  82 ++++++++-
 .../apache/impala/catalog/local/MetaProvider.java  |   3 +
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/util/AcidUtils.java     |   9 +-
 .../catalog/local/CatalogdMetaProviderTest.java    | 199 +++++++++++++++++++++
 12 files changed, 356 insertions(+), 14 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 41bf355..405a72c 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -344,6 +344,12 @@ DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
     "admission service, if enabled. Heartbeats are used to ensure resources are properly "
     "accounted for even if rpcs to the admission service occasionally fail.");
 
+DEFINE_bool(auto_check_compaction, false,
+    "When true, compaction checking will be conducted for each query in local catalog "
+    "mode. Note that this checking introduces additional overhead because Impala makes "
+    "additional RPCs to hive metastore for each table in a query during the query "
+    "compilation.");
+
 // Flags for JWT token based authentication.
 DECLARE_bool(jwt_token_auth);
 DECLARE_bool(jwt_validate_signature);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f210ddb..206b7fc 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -99,6 +99,7 @@ DECLARE_bool(enable_catalogd_hms_cache);
 DECLARE_string(kudu_sasl_protocol_name);
 DECLARE_bool(invalidate_hms_cache_on_ddls);
 DECLARE_bool(hms_event_incremental_refresh_transactional_table);
+DECLARE_bool(auto_check_compaction);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -319,6 +320,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
       FLAGS_startup_filesystem_check_directories);
   cfg.__set_hms_event_incremental_refresh_transactional_table(
       FLAGS_hms_event_incremental_refresh_transactional_table);
+  cfg.__set_auto_check_compaction(FLAGS_auto_check_compaction);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 198f1f2..dcd0a24 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -219,4 +219,6 @@ struct TBackendGflags {
   97: required bool hms_event_incremental_refresh_transactional_table
 
   98: required bool enable_shell_based_groups_mapping_support
+
+  99: required bool auto_check_compaction
 }
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 1b6ab30..886c41b 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -414,6 +414,9 @@ struct TPartialPartitionInfo {
   // Set if 'want_partition_files' was set in TTableInfoSelector.
   9: optional list<CatalogObjects.THdfsFileDesc> delete_file_descriptors
 
+  // Set if 'want_partition_files' was set in TTableInfoSelector.
+  14: optional i64 last_compaction_id
+
   // Deflate-compressed byte[] representation of TPartitionStats for this partition.
   // Set if 'want_partition_stats' was set in TTableInfoSelector. Not set if the
   // partition does not have stats.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CompactionInfoLoader.java b/fe/src/main/java/org/apache/impala/catalog/CompactionInfoLoader.java
index 2e14678..b673e58 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CompactionInfoLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CompactionInfoLoader.java
@@ -17,8 +17,10 @@
 
 package org.apache.impala.catalog;
 
+import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
 import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
+import org.apache.thrift.TException;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,14 +37,12 @@ public class CompactionInfoLoader {
       new ConcurrentHashMap<>();
 
   public static GetLatestCommittedCompactionInfoResponse getLatestCompactionInfo(
-      CatalogServiceCatalog catalog, GetLatestCommittedCompactionInfoRequest request)
-      throws CatalogException {
+      MetaStoreClientPool.MetaStoreClient client,
+      GetLatestCommittedCompactionInfoRequest request)
+      throws TException {
     FutureTask<GetLatestCommittedCompactionInfoResponse> reqTask =
         new FutureTask<>(() -> {
-          try (
-              MetaStoreClientPool.MetaStoreClient client = catalog.getMetaStoreClient()) {
-            return client.getHiveClient().getLatestCommittedCompactionInfo(request);
-          }
+          return client.getHiveClient().getLatestCommittedCompactionInfo(request);
         });
 
     FutureTask<GetLatestCommittedCompactionInfoResponse> existingTask =
@@ -56,8 +56,10 @@ public class CompactionInfoLoader {
     try {
       return reqTask.get();
     } catch (Exception e) {
-      throw new CatalogException("Error getting latest compaction info for "
-              + request.getDbname() + "." + request.getTablename(), e);
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
+      // The HMS request should only throw TException, we won't get any other exceptions
+      // here. If for some reason we do, just rethrow as RTE.
+      throw new RuntimeException(e);
     } finally {
       requests_.remove(request, reqTask);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index afc084e..c48d14e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2158,6 +2158,7 @@ public class HdfsTable extends Table implements FeFsTable {
         }
 
         if (req.table_info_selector.want_partition_files) {
+          partInfo.setLast_compaction_id(part.getLastCompactionId());
           try {
             if (!part.getInsertFileDescriptors().isEmpty()) {
               partInfo.file_descriptors = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 70f1116..7dff758 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableCollection;
@@ -65,6 +66,7 @@ import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.service.MetadataOp;
@@ -93,6 +95,7 @@ import org.apache.impala.thrift.TUnit;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.thrift.TValidWriteIdList;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -895,6 +898,18 @@ public class CatalogdMetaProvider implements MetaProvider {
     // Load what we can from the cache.
     Map<PartitionRef, PartitionMetadata> refToMeta = loadPartitionsFromCache(refImpl,
         hostIndex, partitionRefs);
+    if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
+      // If any partitions are stale after compaction, they will be removed from refToMeta
+      List<PartitionRef> stalePartitions = directProvider_.checkLatestCompaction(
+          refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
+      cache_.invalidateAll(stalePartitions.stream()
+          .map(PartitionRefImpl.class::cast)
+          .map(PartitionRefImpl::getId)
+          .map(PartitionCacheKey::new)
+          .collect(Collectors.toList()));
+      LOG.debug("Checked the latest compaction id for {}.{}", refImpl.dbName_,
+          refImpl.tableName_);
+    }
 
     final int numHits = refToMeta.size();
     final int numMisses = partitionRefs.size() - numHits;
@@ -941,6 +956,9 @@ public class CatalogdMetaProvider implements MetaProvider {
     req.table_info_selector.partition_ids = ids;
     req.table_info_selector.want_partition_metadata = true;
     req.table_info_selector.want_partition_files = true;
+    if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
+      req.table_info_selector.valid_write_ids = table.validWriteIds_;
+    }
     // TODO(todd): fetch incremental stats on-demand for compute-incremental-stats.
     req.table_info_selector.want_partition_stats = true;
     TGetPartialCatalogObjectResponse resp = sendRequest(req);
@@ -992,7 +1010,8 @@ public class CatalogdMetaProvider implements MetaProvider {
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(part.getHms_parameters(),
           part.write_id, hdfsStorageDescriptor,
           fds, insertFds, deleteFds, part.getPartition_stats(),
-          part.has_incremental_stats, part.is_marked_cached, location);
+          part.has_incremental_stats, part.is_marked_cached, location,
+          part.last_compaction_id);
 
       checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
 
@@ -1565,12 +1584,13 @@ public class CatalogdMetaProvider implements MetaProvider {
     private final byte[] partitionStats_;
     private final boolean hasIncrementalStats_;
     private final boolean isMarkedCached_;
+    private final long lastCompactionId_;
 
     public PartitionMetadataImpl(Map<String, String> hmsParameters, long writeId,
         HdfsStorageDescriptor hdfsStorageDescriptor, ImmutableList<FileDescriptor> fds,
         ImmutableList<FileDescriptor> insertFds, ImmutableList<FileDescriptor> deleteFds,
         byte[] partitionStats, boolean hasIncrementalStats, boolean isMarkedCached,
-        HdfsPartitionLocationCompressor.Location location) {
+        HdfsPartitionLocationCompressor.Location location, long lastCompactionId) {
       this.hmsParameters_ = hmsParameters;
       this.writeId_ = writeId;
       this.hdfsStorageDescriptor_ = hdfsStorageDescriptor;
@@ -1581,6 +1601,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       this.partitionStats_ = partitionStats;
       this.hasIncrementalStats_ = hasIncrementalStats;
       this.isMarkedCached_ = isMarkedCached;
+      this.lastCompactionId_ = lastCompactionId;
     }
 
     /**
@@ -1598,7 +1619,7 @@ public class CatalogdMetaProvider implements MetaProvider {
           deleteFds_, origIndex, dstIndex);
       return new PartitionMetadataImpl(hmsParameters_, writeId_, hdfsStorageDescriptor_,
           fds, insertFds, deleteFds, partitionStats_, hasIncrementalStats_,
-          isMarkedCached_, location_);
+          isMarkedCached_, location_, lastCompactionId_);
     }
 
     private static ImmutableList<FileDescriptor> cloneFdsRelativeToHostIndex(
@@ -1653,6 +1674,9 @@ public class CatalogdMetaProvider implements MetaProvider {
 
     @Override
     public boolean isMarkedCached() { return isMarkedCached_; }
+
+    @Override
+    public long getLastCompactionId() { return lastCompactionId_; }
   }
 
   /**
@@ -1715,6 +1739,12 @@ public class CatalogdMetaProvider implements MetaProvider {
       return String.format("TableMetaRef %s.%s@%d", dbName_, tableName_, catalogVersion_);
     }
 
+    @Override
+    public boolean isPartitioned() {
+      return msTable_.getPartitionKeysSize() != 0;
+    }
+
+    @Override
     public boolean isMarkedCached() { return isMarkedCached_; }
 
     public HdfsPartitionLocationCompressor getPartitionLocationCompressor() {
@@ -1724,6 +1754,11 @@ public class CatalogdMetaProvider implements MetaProvider {
     public List<String> getPartitionPrefixes() {
       return partitionLocationCompressor_.getPrefixes();
     }
+
+    @Override
+    public boolean isTransactional() {
+      return AcidUtils.isTransactionalTable(msTable_.getParameters());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 24411a2..c91150d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -19,16 +19,26 @@ package org.apache.impala.catalog.local;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
+import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -37,6 +47,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CompactionInfoLoader;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
@@ -48,6 +59,7 @@ import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
@@ -55,6 +67,7 @@ import org.apache.impala.thrift.TBriefTableMeta;
 import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TValidWriteIdList;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.thrift.TException;
@@ -67,12 +80,15 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.errorprone.annotations.Immutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Metadata provider which calls out directly to the source systems
  * (filesystem, HMS, etc) with no caching.
  */
 class DirectMetaProvider implements MetaProvider {
+  private final static Logger LOG = LoggerFactory.getLogger(DirectMetaProvider.class);
   private static MetaStoreClientPool msClientPool_;
 
   DirectMetaProvider() {
@@ -465,6 +481,12 @@ class DirectMetaProvider implements MetaProvider {
       throw new UnsupportedOperationException("Hdfs caching not supported with " +
           "DirectMetaProvider implementation");
     }
+
+    @Override
+    public long getLastCompactionId() {
+      throw new UnsupportedOperationException("Compaction id is not provided with " +
+          "DirectMetaProvider implementation");
+    }
   }
 
   private class TableMetaRefImpl implements TableMetaRef {
@@ -479,7 +501,8 @@ class DirectMetaProvider implements MetaProvider {
       this.msTable_ = msTable;
     }
 
-    private boolean isPartitioned() {
+    @Override
+    public boolean isPartitioned() {
       return msTable_.getPartitionKeysSize() != 0;
     }
 
@@ -493,6 +516,11 @@ class DirectMetaProvider implements MetaProvider {
     public List<String> getPartitionPrefixes() {
       return Collections.emptyList();
     }
+
+    @Override
+    public boolean isTransactional() {
+      return AcidUtils.isTransactionalTable(msTable_.getParameters());
+    }
   }
 
   @Override
@@ -501,6 +529,58 @@ class DirectMetaProvider implements MetaProvider {
         "getValidWriteIdList() is not implemented for DirectMetaProvider");
   }
 
+  /**
+   * Fetches the latest compaction id from HMS and compares with partition metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas.
+   */
+  public List<PartitionRef> checkLatestCompaction(String dbName, String tableName,
+      TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
+    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+    Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    Stopwatch sw = Stopwatch.createStarted();
+    List<PartitionRef> stalePartitions = new ArrayList<>();
+    if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
+    GetLatestCommittedCompactionInfoRequest request =
+        new GetLatestCommittedCompactionInfoRequest(dbName, tableName);
+    if (table.isPartitioned()) {
+      request.setPartitionnames(metas.keySet().stream()
+          .map(PartitionRef::getName).collect(Collectors.toList()));
+    }
+    GetLatestCommittedCompactionInfoResponse response;
+    try (MetaStoreClientPool.MetaStoreClient client = msClientPool_.getClient()) {
+      response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
+    }
+    Map<String, Long> partNameToCompactionId = new HashMap<>();
+    // If the table is partitioned, we must set partition name, otherwise empty result
+    // will be returned.
+    if (table.isPartitioned()) {
+      for (CompactionInfoStruct ci : response.getCompactions()) {
+        partNameToCompactionId.put(
+            Preconditions.checkNotNull(ci.getPartitionname()), ci.getId());
+      }
+    } else {
+      CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(),
+          null);
+      if (ci != null) {
+        partNameToCompactionId.put(PartitionRefImpl.UNPARTITIONED_NAME, ci.getId());
+      }
+    }
+    Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter =
+        metas.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
+      long latestCompactionId = partNameToCompactionId.getOrDefault(
+          entry.getKey().getName(), -1L);
+      if (entry.getValue().getLastCompactionId() < latestCompactionId) {
+        stalePartitions.add(entry.getKey());
+        iter.remove();
+      }
+    }
+    LOG.debug("Checked the latest compaction id for {}.{} Time taken: {}", dbName,
+        tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
+    return stalePartitions;
+  }
+
   @Override
   public FeIcebergTable.Snapshot loadIcebergSnapshot(final TableMetaRef table,
       ListMap<TNetworkAddress> hostIndex)
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 4fc0aa0..524871d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -141,6 +141,8 @@ public interface MetaProvider {
   interface TableMetaRef {
     boolean isMarkedCached();
     List<String> getPartitionPrefixes();
+    boolean isPartitioned();
+    boolean isTransactional();
   }
 
   /**
@@ -166,6 +168,7 @@ public interface MetaProvider {
     byte[] getPartitionStats();
     boolean hasIncrementalStats();
     boolean isMarkedCached();
+    long getLastCompactionId();
   }
 
   public TValidWriteIdList getValidWriteIdList(TableMetaRef ref);
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 898ae90..1befe4b 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -341,4 +341,8 @@ public class BackendConfig {
   public boolean getHMSEventIncrementalRefreshTransactionalTable() {
     return backendCfg_.hms_event_incremental_refresh_transactional_table;
   }
+
+  public boolean isAutoCheckCompaction() {
+    return backendCfg_.auto_check_compaction;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 8d68f64..4d795bb 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -805,8 +805,13 @@ public class AcidUtils {
       request.setPartitionnames(partNames);
     }
 
-    GetLatestCommittedCompactionInfoResponse response =
-        CompactionInfoLoader.getLatestCompactionInfo(catalog, request);
+    GetLatestCommittedCompactionInfoResponse response;
+    try (MetaStoreClientPool.MetaStoreClient client = catalog.getMetaStoreClient()) {
+      response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
+    } catch (Exception e) {
+      throw new CatalogException("Error getting latest compaction info for "
+          + hdfsTable.getFullName(), e);
+    }
 
     Map<String, Long> partNameToCompactionId = new HashMap<>();
     if (hdfsTable.isPartitioned()) {
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index 20e5d32..47c00f8 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -36,8 +36,13 @@ import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
 import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.Frontend;
 import org.apache.impala.service.FrontendProfile;
+import org.apache.impala.testutil.HiveJdbcClientPool;
+import org.apache.impala.testutil.HiveJdbcClientPool.HiveJdbcClient;
+import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TBriefTableMeta;
@@ -71,6 +76,12 @@ public class CatalogdMetaProviderTest {
 
   private CacheStats prevStats_;
 
+  private static HiveJdbcClientPool hiveJdbcClientPool_;
+  private static final String testDbName_ = "catalogd_meta_provider_test";
+  private static final String testTblName_ = "insert_only";
+  private static final String testPartitionedTblName_ = "insert_only_partitioned";
+  private static final String testFullAcidTblName_ = "full_acid";
+
   static {
     FeSupport.loadLibrary();
   }
@@ -84,6 +95,7 @@ public class CatalogdMetaProviderTest {
     Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", "alltypes");
     tableRef_ = tablePair.second;
     prevStats_ = provider_.getCacheStats();
+    hiveJdbcClientPool_ = HiveJdbcClientPool.create(1);
   }
 
   private CacheStats diffStats() {
@@ -94,6 +106,50 @@ public class CatalogdMetaProviderTest {
     return diff;
   }
 
+  private void createTestTbls() throws Exception {
+    ImpalaJdbcClient client = ImpalaJdbcClient.createClientUsingHiveJdbcDriver();
+    client.connect();
+    try {
+      client.execStatement("drop database if exists " + testDbName_ + " cascade");
+      client.execStatement("create database " + testDbName_);
+      client.execStatement("create table " + getTestTblName() + " like "
+          + "functional.insert_only_transactional_table stored as parquet");
+      client.execStatement("create table " + getTestPartitionedTblName()
+          + " (c1 int) partitioned by (part int) stored as parquet "
+          + "tblproperties ('transactional'='true', 'transactional_properties'="
+          + "'insert_only')");
+      client.execStatement("create table " + getTestFullAcidTblName()
+          + " (c1 int) partitioned by (part int) stored as orc "
+          + "tblproperties ('transactional'='true')");
+    } finally {
+      client.close();
+    }
+  }
+
+  private void dropTestTbls() throws Exception {
+    try (HiveJdbcClient hiveClient = hiveJdbcClientPool_.getClient()) {
+      hiveClient.executeSql("drop database if exists " + testDbName_ + " cascade");
+    }
+  }
+
+  private static String getTestTblName() {
+    return testDbName_ + "." + testTblName_;
+  }
+
+  private static String getTestPartitionedTblName() {
+    return testDbName_ + "." + testPartitionedTblName_;
+  }
+
+  private static String getTestFullAcidTblName() {
+    return testDbName_ + "." + testFullAcidTblName_;
+  }
+
+  private void executeHiveSql(String query) throws Exception {
+    try (HiveJdbcClient hiveClient = hiveJdbcClientPool_.getClient()) {
+      hiveClient.executeSql(query);
+    }
+  }
+
   @Test
   public void testCachePartitionList() throws Exception {
     List<PartitionRef> partList = provider_.loadPartitionList(tableRef_);
@@ -145,6 +201,26 @@ public class CatalogdMetaProviderTest {
         partRefs);
   }
 
+  /**
+   * Helper method for loading partitions by dbName and tableName. Retries when there is
+   * inconsistent metadata.
+   */
+  private Map<String, PartitionMetadata> loadPartitions(String dbName, String tableName)
+      throws Exception {
+    Frontend.RetryTracker retryTracker = new Frontend.RetryTracker(
+        String.format("load partitions for table %s.%s", dbName, tableName));
+    while (true) {
+      try {
+        Pair<Table, TableMetaRef> tablePair = provider_.loadTable(dbName, tableName);
+        List<PartitionRef> allRefs = provider_.loadPartitionList(tablePair.second);
+        return loadPartitions(tablePair.second, allRefs);
+      } catch (InconsistentMetadataFetchException e) {
+        diffStats();
+        retryTracker.handleRetryOrThrow(e);
+      }
+    }
+  }
+
   @Test
   public void testCacheColumnStats() throws Exception {
     ImmutableList<String> colNames = ImmutableList.of("month", "id");
@@ -428,4 +504,127 @@ public class CatalogdMetaProviderTest {
     assertEquals(1, stats.missCount());
   }
 
+  @Test
+  public void testFullAcidFileMetadataAfterMajorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      String partition = "partition (part=1)";
+      testFileMetadataAfterCompaction(testDbName_, testFullAcidTblName_, partition,
+          true);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  @Test
+  public void testFullAcidFileMetadataAfterMinorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      String partition = "partition (part=1)";
+      testFileMetadataAfterCompaction(testDbName_, testFullAcidTblName_, partition,
+          false);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  @Test
+  public void testTableFileMetadataAfterMajorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      testFileMetadataAfterCompaction(testDbName_, testTblName_, "", true);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  @Test
+  public void testTableFileMetadataAfterMinorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      testFileMetadataAfterCompaction(testDbName_, testTblName_, "", false);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  @Test
+  public void testPartitionFileMetadataAfterMajorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      String partition = "partition (part=1)";
+      testFileMetadataAfterCompaction(testDbName_, testPartitionedTblName_, partition,
+          true);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  @Test
+  public void testPartitionFileMetadataAfterMinorCompaction() throws Exception {
+    boolean origFlag = BackendConfig.INSTANCE.isAutoCheckCompaction();
+    try {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(true);
+      createTestTbls();
+      String partition = "partition (part=1)";
+      testFileMetadataAfterCompaction(testDbName_, testPartitionedTblName_, partition,
+          false);
+    } finally {
+      BackendConfig.INSTANCE.getBackendCfg().setAuto_check_compaction(origFlag);
+      dropTestTbls();
+    }
+  }
+
+  private void testFileMetadataAfterCompaction(String dbName, String tableName,
+      String partition, boolean isMajorCompaction) throws Exception {
+    String tableOrPartition = dbName + "." + tableName + " " + partition;
+    executeHiveSql("insert into " + tableOrPartition + " values (1)");
+    executeHiveSql("insert into " + tableOrPartition + " values (2)");
+    loadPartitions(dbName, tableName);
+    // load again to make sure the partition is in cache.
+    Map<String, PartitionMetadata> partMap;
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      partMap = loadPartitions(dbName, tableName);
+      FrontendProfile profile = FrontendProfile.getCurrent();
+      TRuntimeProfileNode prof = profile.emitAsThrift();
+      Map<String, TCounter> counters = Maps.uniqueIndex(prof.counters, TCounter::getName);
+      assertEquals(1, counters.get("CatalogFetch.Partitions.Requests").getValue());
+      assertEquals(1, counters.get("CatalogFetch.Partitions.Hits").getValue());
+      int preFileCount = partMap.values().stream()
+          .map(PartitionMetadata::getFileDescriptors).mapToInt(List::size).sum();
+      assertEquals(2, preFileCount);
+    }
+
+    String compactionType = isMajorCompaction ? "'major'" : "'minor'";
+    executeHiveSql(
+        "alter table " + tableOrPartition + " compact " + compactionType + " and wait");
+    // After Compaction, it should fetch the partition from catalogd again, and
+    // file metadata should be updated.
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      partMap = loadPartitions(dbName, tableName);
+      FrontendProfile profile = FrontendProfile.getCurrent();
+      TRuntimeProfileNode prof = profile.emitAsThrift();
+      Map<String, TCounter> counters = Maps.uniqueIndex(prof.counters, TCounter::getName);
+      assertEquals(1, counters.get("CatalogFetch.Partitions.Requests").getValue());
+      assertEquals(1, counters.get("CatalogFetch.Partitions.Misses").getValue());
+      int afterFileCount = partMap.values().stream()
+          .map(PartitionMetadata::getFileDescriptors).mapToInt(List::size).sum();
+      assertEquals(1, afterFileCount);
+    }
+  }
 }