You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 19:39:56 UTC

(impala) 01/03: IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 142ae261e319c3b602128deb1329713d5253637d
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Dec 14 12:45:34 2023 +0530

    IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider
    
    This commit removes the CatalogMetaProvider dependence on
    DirectMetaProvider. CatalogMetaProvider depends on DirectMetaProvider
    for 2 APIs. Implemented the APIs on catalog server and used them
    instead. DirectMetaProvider is not referenced anywhere now. But it is
    retained for future use.
    
    Testing:
    - Manually tested and CatalogdMetaProviderTest covers the tests.
    
    Change-Id: I096c1b1d1a52e979c8b2d8173dae9ca2cc6c36d2
    Reviewed-on: http://gerrit.cloudera.org:8080/20791
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 28 ++++++++
 be/src/catalog/catalog-service-client-wrapper.h    | 16 +++++
 be/src/catalog/catalog.cc                          | 11 ++++
 be/src/catalog/catalog.h                           |  9 +++
 be/src/exec/catalog-op-executor.cc                 | 28 ++++++++
 be/src/exec/catalog-op-executor.h                  |  8 +++
 be/src/service/fe-support.cc                       | 48 ++++++++++++++
 common/thrift/CatalogService.thrift                | 35 ++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  9 +++
 .../org/apache/impala/compat/MetastoreShim.java    | 77 ++++++++++++----------
 .../impala/catalog/local/CatalogdMetaProvider.java | 67 ++++++++++++++++---
 .../java/org/apache/impala/service/FeSupport.java  | 42 ++++++++++++
 .../java/org/apache/impala/service/JniCatalog.java | 32 +++++++++
 .../catalog/local/CatalogdMetaProviderTest.java    | 15 +++++
 14 files changed, 382 insertions(+), 43 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 43f77e970..cc1667731 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -363,6 +363,34 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     VLOG_RPC << "UpdateTableUsage(): response.status=" << resp.status;
   }
 
+  void GetNullPartitionName(TGetNullPartitionNameResponse& resp,
+      const TGetNullPartitionNameRequest& req) override {
+    VLOG_RPC << "GetNullPartitionName(): request=" << ThriftDebugString(req);
+    Status status = CheckProtocolVersion(req.protocol_version);
+    if (status.ok()) {
+      status = catalog_server_->catalog()->GetNullPartitionName(&resp);
+    }
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetNullPartitionName(): response=" << ThriftDebugStringNoThrow(resp);
+  }
+
+  void GetLatestCompactions(TGetLatestCompactionsResponse& resp,
+      const TGetLatestCompactionsRequest& req) override {
+    VLOG_RPC << "GetLatestCompactions(): request=" << ThriftDebugString(req);
+    Status status = CheckProtocolVersion(req.protocol_version);
+    if (status.ok()) {
+      status = catalog_server_->catalog()->GetLatestCompactions(req, &resp);
+    }
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetLatestCompactions(): response=" << ThriftDebugStringNoThrow(resp);
+  }
+
  private:
   CatalogServer* catalog_server_;
 
diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h
index 294960771..6775e4594 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -110,6 +110,22 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
     *send_done = true;
     recv_UpdateTableUsage(_return);
   }
+
+  void GetNullPartitionName(TGetNullPartitionNameResponse& _return,
+      const TGetNullPartitionNameRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetNullPartitionName(req);
+    *send_done = true;
+    recv_GetNullPartitionName(_return);
+  }
+
+  void GetLatestCompactions(TGetLatestCompactionsResponse& _return,
+      const TGetLatestCompactionsRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetLatestCompactions(req);
+    *send_done = true;
+    recv_GetLatestCompactions(_return);
+  }
 #pragma clang diagnostic pop
 };
 
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 698091787..92b306b51 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -71,6 +71,8 @@ Catalog::Catalog() {
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
     {"regenerateServiceId", "()V", &regenerate_service_id_},
     {"refreshDataSources", "()V", &refresh_data_sources_},
+    {"getNullPartitionName", "()[B", &get_null_partition_name_id_},
+    {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -220,3 +222,12 @@ void Catalog::RegenerateServiceId() {
 Status Catalog::RefreshDataSources() {
   return JniUtil::CallJniMethod(catalog_, refresh_data_sources_);
 }
+
+Status Catalog::GetNullPartitionName(TGetNullPartitionNameResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_null_partition_name_id_, resp);
+}
+
+Status Catalog::GetLatestCompactions(
+    const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_latest_compactions_id_, req, resp);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index c94f09a6b..f102a2908 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -137,6 +137,13 @@ class Catalog {
   /// report.
   Status UpdateTableUsage(const TUpdateTableUsageRequest& req);
 
+  /// Gets the null partition name.
+  Status GetNullPartitionName(TGetNullPartitionNameResponse* resp);
+
+  /// Gets the latest compactions for the request.
+  Status GetLatestCompactions(
+      const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp);
+
   /// Regenerate Catalog Service ID.
   /// The function should be called when the CatalogD becomes active.
   void RegenerateServiceId();
@@ -170,6 +177,8 @@ class Catalog {
   jmethodID update_table_usage_id_;
   jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
   jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources()
+  jmethodID get_null_partition_name_id_; // JniCatalog.getNullPartitionName()
+  jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions()
 };
 
 }
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index a7fb2381b..074d828e3 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -422,3 +422,31 @@ Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
   RETURN_IF_ERROR(rpc_status.status);
   return Status::OK();
 }
+
+Status CatalogOpExecutor::GetNullPartitionName(
+    const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result) {
+  int attempt = 0; // Used for debug action only.
+  CatalogServiceConnection::RpcStatus rpc_status =
+      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+          *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+          &CatalogServiceClientWrapper::GetNullPartitionName, req,
+          FLAGS_catalog_client_connection_num_retries,
+          FLAGS_catalog_client_rpc_retry_interval_ms,
+          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+  RETURN_IF_ERROR(rpc_status.status);
+  return Status::OK();
+}
+
+Status CatalogOpExecutor::GetLatestCompactions(
+    const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result) {
+  int attempt = 0; // Used for debug action only.
+  CatalogServiceConnection::RpcStatus rpc_status =
+      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+          *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+          &CatalogServiceClientWrapper::GetLatestCompactions, req,
+          FLAGS_catalog_client_connection_num_retries,
+          FLAGS_catalog_client_rpc_retry_interval_ms,
+          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+  RETURN_IF_ERROR(rpc_status.status);
+  return Status::OK();
+}
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index cab823bc6..12bf2e0f1 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -80,6 +80,14 @@ class CatalogOpExecutor {
   Status UpdateTableUsage(const TUpdateTableUsageRequest& req,
       TUpdateTableUsageResponse* resp);
 
+  /// Makes an RPC to the catalog server to get the null partition name.
+  Status GetNullPartitionName(
+      const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result);
+
+  /// Makes an RPC to the catalog server to get the latest compactions.
+  Status GetLatestCompactions(
+      const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result);
+
   /// Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution.
   /// If called before Exec(), this will return NULL. Only set if the
   /// TCatalogOpType is DDL.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index a31979ce3..f4bee52e6 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -695,6 +695,46 @@ Java_org_apache_impala_service_FeSupport_nativeParseDateString(JNIEnv* env,
   return result_bytes;
 }
 
+// Native method to make a request to catalog server to get the null partition name.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName(
+    JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+  TGetNullPartitionNameRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+  TGetNullPartitionNameResponse result;
+  Status status = catalog_op_executor.GetNullPartitionName(request, &result);
+  if (!status.ok()) {
+    LOG(ERROR) << status.GetDetail();
+    status.ToThrift(&result.status);
+  }
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
+// Native method to make a request to catalog server to get the latest compactions.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions(
+    JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+  TGetLatestCompactionsRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+  TGetLatestCompactionsResponse result;
+  Status status = catalog_op_executor.GetLatestCompactions(request, &result);
+  if (!status.ok()) {
+    LOG(ERROR) << status.GetDetail();
+    status.ToThrift(&result.status);
+  }
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -770,6 +810,14 @@ static JNINativeMethod native_methods[] = {
     const_cast<char*>("(Ljava/lang/String;)[B"),
     (void*)::Java_org_apache_impala_service_FeSupport_nativeParseDateString
   },
+  {
+    const_cast<char*>("NativeGetNullPartitionName"), const_cast<char*>("([B)[B"),
+    (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName
+  },
+  {
+    const_cast<char*>("NativeGetLatestCompactions"), const_cast<char*>("([B)[B"),
+    (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 99d210d06..701f10641 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -666,6 +666,35 @@ struct TGetPartitionStatsResponse {
   2: optional map<string, binary> partition_stats
 }
 
+// Request null partition name.
+struct TGetNullPartitionNameRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+}
+
+// Response for null partition name request.
+struct TGetNullPartitionNameResponse {
+  1: required Status.TStatus status
+  // Null partition name.
+  2: required string partition_value
+}
+
+// Request latest compactions.
+struct TGetLatestCompactionsRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+  2: required string db_name
+  3: required string table_name
+  4: required string non_parition_name
+  5: optional list<string> partition_names
+  6: required i64 last_compaction_id
+}
+
+// Response for latest compactions request.
+struct TGetLatestCompactionsResponse {
+  1: required Status.TStatus status
+  // Map of partition name to the compaction id
+  2: required map<string, i64> partition_to_compaction_id
+}
+
 // Instructs the Catalog Server to prioritizing loading of metadata for the specified
 // catalog objects. Currently only used for controlling the priority of loading
 // tables/views since Db/Function metadata is loaded on startup.
@@ -736,4 +765,10 @@ service CatalogService {
   // Update recently used tables and their usage counts in an impalad since the last
   // report.
   TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);
+
+  // Gets the null partition name used at HMS.
+  TGetNullPartitionNameResponse GetNullPartitionName(1: TGetNullPartitionNameRequest req);
+
+  // Gets the latest compactions.
+  TGetLatestCompactionsResponse GetLatestCompactions(1: TGetLatestCompactionsRequest req);
 }
diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 038f6eb1e..21f0a7266 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -589,6 +589,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         "getPartitionsForRefreshingFileMetadata is not supported.");
   }
 
+  /**
+   * CDP Hive-3 only function.
+   */
+  public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+      String dbName, String tableName, List<String> partitionNames,
+      String unPartitionedName, long lastCompactionId) throws TException {
+    throw new UnsupportedOperationException("getLatestCompactions is not supported.");
+  }
+
   /**
    * CDP Hive-3 only function.
    */
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d3f33e8cb..a1ea42fe8 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -743,39 +743,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
   }
 
   /**
-   * Fetches the latest compaction id from HMS and compares with partition metadata in
-   * cache. If a partition is stale due to compaction, removes it from metas.
+   * Fetches the latest compactions from HMS
    */
-  public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
-      String dbName, String tableName, TableMetaRef table,
-      Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
-      throws TException {
-    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
-    Preconditions.checkNotNull(metas, "Partition map must be non-null");
-    if (metas.isEmpty()) {
-      return Collections.emptyList();
-    }
-    Stopwatch sw = Stopwatch.createStarted();
-    List<PartitionRef> stalePartitions = new ArrayList<>();
-    if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
+  public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+      String dbName, String tableName, List<String> partitionNames,
+      String unPartitionedName, long lastCompactionId) throws TException {
     GetLatestCommittedCompactionInfoRequest request =
         new GetLatestCommittedCompactionInfoRequest(dbName, tableName);
-    if (table.isPartitioned()) {
-      request.setPartitionnames(metas.keySet().stream()
-          .map(PartitionRef::getName).collect(Collectors.toList()));
-    }
-    long lastCompactionId = metas.values().stream()
-        .mapToLong(p -> p.getLastCompactionId()).max().orElse(-1);
-    if (lastCompactionId > 0) {
-      request.setLastCompactionId(lastCompactionId);
-    }
-
+    request.setPartitionnames(partitionNames);
+    if (lastCompactionId > 0) request.setLastCompactionId(lastCompactionId);
     GetLatestCommittedCompactionInfoResponse response;
-    try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
-      response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
-    }
+    response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
     Map<String, Long> partNameToCompactionId = new HashMap<>();
-    if (table.isPartitioned()) {
+    if (partitionNames != null) {
       for (CompactionInfoStruct ci : response.getCompactions()) {
         if (ci.getPartitionname() != null) {
           partNameToCompactionId.put(ci.getPartitionname(), ci.getId());
@@ -786,14 +766,41 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         }
       }
     } else {
-      CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(),
-          null);
-      if (ci != null) {
-        partNameToCompactionId.put(unPartitionedName, ci.getId());
-      }
+      CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), null);
+      if (ci != null) partNameToCompactionId.put(unPartitionedName, ci.getId());
+    }
+    return partNameToCompactionId;
+  }
+
+  /**
+   * Fetches the latest compaction id from HMS and compares with partition metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas.
+   */
+  public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
+      String dbName, String tableName, TableMetaRef table,
+      Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
+      throws TException {
+    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+    Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+    Stopwatch sw = Stopwatch.createStarted();
+    List<String> partitionNames = null;
+    if (table.isPartitioned()) {
+      partitionNames =
+          metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
     }
-    Iterator<Entry<PartitionRef, PartitionMetadata>> iter =
-        metas.entrySet().iterator();
+    long lastCompactionId = metas.values()
+                                .stream()
+                                .mapToLong(PartitionMetadata::getLastCompactionId)
+                                .max()
+                                .orElse(-1);
+    Map<String, Long> partNameToCompactionId = Collections.emptyMap();
+    try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
+      partNameToCompactionId = getLatestCompactions(
+          client, dbName, tableName, partitionNames, unPartitionedName, lastCompactionId);
+    }
+    List<PartitionRef> stalePartitions = new ArrayList<>();
+    Iterator<Entry<PartitionRef, PartitionMetadata>> iter = metas.entrySet().iterator();
     while (iter.hasNext()) {
       Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
       if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index cbfbfbf96..2a7b611b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -20,8 +20,10 @@ package org.apache.impala.catalog.local;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,6 +65,7 @@ import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
@@ -71,6 +74,7 @@ import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.FrontendProfile;
@@ -88,6 +92,8 @@ import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionName;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsFileDesc;
@@ -117,6 +123,7 @@ import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -261,11 +268,6 @@ public class CatalogdMetaProvider implements MetaProvider {
   private final ListMap<TNetworkAddress> cacheHostIndex_ =
       new ListMap<TNetworkAddress>();
 
-  // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
-  // of metadata. In order to incrementally build this out, we delegate various calls
-  // to the "direct" provider for now and circumvent catalogd.
-  private DirectMetaProvider directProvider_ = new DirectMetaProvider();
-
   /**
    * Number of requests which piggy-backed on a concurrent request for the same key,
    * and resulted in success. Used only for test assertions.
@@ -948,8 +950,8 @@ public class CatalogdMetaProvider implements MetaProvider {
         hostIndex, partitionRefs);
     if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
       // If any partitions are stale after compaction, they will be removed from refToMeta
-      List<PartitionRef> stalePartitions = directProvider_.checkLatestCompaction(
-          refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
+      List<PartitionRef> stalePartitions =
+          checkLatestCompaction(refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
       cache_.invalidateAll(stalePartitions.stream()
           .map(PartitionRefImpl.class::cast)
           .map(PartitionRefImpl::getId)
@@ -988,6 +990,55 @@ public class CatalogdMetaProvider implements MetaProvider {
     return nameToMeta;
   }
 
+  /**
+   * Fetches the latest compactions from catalogd and compares with partition metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas. And return
+   * the stale partitions.
+   */
+  private List<PartitionRef> checkLatestCompaction(String dbName, String tableName,
+      TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
+    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+    Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+    Stopwatch sw = Stopwatch.createStarted();
+    TGetLatestCompactionsRequest req = new TGetLatestCompactionsRequest();
+    req.db_name = dbName;
+    req.table_name = tableName;
+    req.non_parition_name = HdfsTable.DEFAULT_PARTITION_NAME;
+    if (table.isPartitioned()) {
+      req.partition_names =
+          metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
+    }
+    req.last_compaction_id = metas.values()
+                                 .stream()
+                                 .mapToLong(PartitionMetadata::getLastCompactionId)
+                                 .max()
+                                 .orElse(-1);
+    byte[] ret = FeSupport.GetLatestCompactions(
+        new TSerializer(new TBinaryProtocol.Factory()).serialize(req));
+    TGetLatestCompactionsResponse resp = new TGetLatestCompactionsResponse();
+    new TDeserializer(new TBinaryProtocol.Factory()).deserialize(resp, ret);
+    if (resp.status.status_code != TErrorCode.OK) {
+      throw new TException(Joiner.on("\n").join(resp.status.getError_msgs()));
+    }
+    Map<String, Long> partNameToCompactionId = resp.partition_to_compaction_id;
+    Preconditions.checkNotNull(
+        partNameToCompactionId, "Partition name to compaction id map must be non-null");
+    List<PartitionRef> stalePartitions = new ArrayList<>();
+    Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter =
+        metas.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
+      if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
+        stalePartitions.add(entry.getKey());
+        iter.remove();
+      }
+    }
+    LOG.info("Checked the latest compaction info for {}.{}. Time taken: {}", dbName,
+        tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
+    return stalePartitions;
+  }
+
   /**
    * Load the specified partitions 'prefs' from catalogd. The partitions are made
    * relative to the given 'hostIndex' before being returned.
@@ -1189,7 +1240,7 @@ public class CatalogdMetaProvider implements MetaProvider {
           /** Called to load cache for cache misses */
           @Override
           public String call() throws Exception {
-            return directProvider_.loadNullPartitionKeyValue();
+            return FeSupport.GetNullPartitionName();
           }
         });
   }
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index e8ea8430a..4b56ca15c 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -36,6 +36,8 @@ import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TExprBatch;
+import org.apache.impala.thrift.TGetNullPartitionNameRequest;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TParseDateStringResult;
@@ -140,6 +142,12 @@ public class FeSupport {
   // E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'.
   public native static byte[] nativeParseDateString(String date);
 
+  // Does an RPC to the Catalog Server to get the null partition name.
+  public native static byte[] NativeGetNullPartitionName(byte[] thriftReq);
+
+  // Does an RPC to the Catalog Server to get the latest compactions.
+  public native static byte[] NativeGetLatestCompactions(byte[] thriftReq);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -490,6 +498,40 @@ public class FeSupport {
     }
   }
 
+  private static byte[] GetNullPartitionName(byte[] thriftReq) {
+    try {
+      return NativeGetNullPartitionName(thriftReq);
+    } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+    return NativeGetNullPartitionName(thriftReq);
+  }
+
+  public static String GetNullPartitionName() throws InternalException {
+    TGetNullPartitionNameRequest request = new TGetNullPartitionNameRequest();
+    TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+    try {
+      byte[] result = GetNullPartitionName(
+          new TSerializer(new TBinaryProtocol.Factory()).serialize(request));
+      Preconditions.checkNotNull(result);
+      new TDeserializer(new TBinaryProtocol.Factory()).deserialize(response, result);
+      if (response.getStatus().getStatus_code() != TErrorCode.OK) {
+        throw new InternalException("Error requesting GetNullPartitionName: "
+            + Joiner.on("\n").join(response.getStatus().getError_msgs()));
+      }
+      Preconditions.checkNotNull(response.partition_value);
+      return response.partition_value;
+    } catch (TException e) {
+      // this should never happen
+      throw new InternalException("Error processing request: " + e.getMessage(), e);
+    }
+  }
+
+  public static byte[] GetLatestCompactions(byte[] thriftReq) {
+    try {
+      return NativeGetLatestCompactions(thriftReq);
+    } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+    return NativeGetLatestCompactions(thriftReq);
+  }
+
   /**
    * Calling this function before loadLibrary() causes external frontend
    * initialization to be used during NativeFeInit()
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 1adfd7efa..c25faabaa 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -68,6 +68,9 @@ import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
@@ -85,6 +88,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AuthorizationUtil;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.GlogAppender;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.impala.util.TUniqueIdUtil;
@@ -562,4 +566,32 @@ public class JniCatalog {
   public void refreshDataSources() throws TException {
     catalog_.refreshDataSources();
   }
+
+  public byte[] getNullPartitionName() throws ImpalaException, TException {
+    return execAndSerialize("getNullPartitionName", "Getting null partition name", () -> {
+      TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        response.setPartition_value(
+            MetaStoreUtil.getNullPartitionKeyValue(msClient.getHiveClient()));
+        response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+      }
+      return response;
+    });
+  }
+
+  public byte[] getLatestCompactions(byte[] thriftParams)
+      throws ImpalaException, TException {
+    TGetLatestCompactionsRequest request = new TGetLatestCompactionsRequest();
+    JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
+    return execAndSerialize("getLatestCompactions", "Getting latest compactions", () -> {
+      TGetLatestCompactionsResponse response = new TGetLatestCompactionsResponse();
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        response.setPartition_to_compaction_id(MetastoreShim.getLatestCompactions(
+            msClient, request.db_name, request.table_name, request.partition_names,
+            request.non_parition_name, request.last_compaction_id));
+        response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+      }
+      return response;
+    });
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index d0cc54deb..57daa7765 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -611,6 +611,21 @@ public class CatalogdMetaProviderTest {
     }
   }
 
+  @Test
+  public void testLoadNullPartitionKeyValue() throws Exception {
+    provider_.cache_.invalidateAll();
+    CacheStats stats = diffStats();
+    String nullPartitionName = provider_.loadNullPartitionKeyValue();
+    assertNotNull(nullPartitionName);
+    stats = diffStats();
+    assertEquals(1, stats.missCount());
+    assertEquals(0, stats.hitCount());
+    assertEquals(nullPartitionName, provider_.loadNullPartitionKeyValue());
+    stats = diffStats();
+    assertEquals(0, stats.missCount());
+    assertEquals(1, stats.hitCount());
+  }
+
   private void testFileMetadataAfterCompaction(String dbName, String tableName,
       String partition, boolean isMajorCompaction) throws Exception {
     String tableOrPartition = dbName + "." + tableName + " " + partition;