You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 19:39:56 UTC
(impala) 01/03: IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider
This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 142ae261e319c3b602128deb1329713d5253637d
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Dec 14 12:45:34 2023 +0530
IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider
This commit removes the CatalogMetaProvider dependence on
DirectMetaProvider. CatalogMetaProvider depends on DirectMetaProvider
for 2 APIs. Implemented the APIs on catalog server and used them
instead. DirectMetaProvider is not referenced anywhere now. But it is
retained for future use.
Testing:
- Manually tested and CatalogdMetaProviderTest covers the tests.
Change-Id: I096c1b1d1a52e979c8b2d8173dae9ca2cc6c36d2
Reviewed-on: http://gerrit.cloudera.org:8080/20791
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/catalog/catalog-server.cc | 28 ++++++++
be/src/catalog/catalog-service-client-wrapper.h | 16 +++++
be/src/catalog/catalog.cc | 11 ++++
be/src/catalog/catalog.h | 9 +++
be/src/exec/catalog-op-executor.cc | 28 ++++++++
be/src/exec/catalog-op-executor.h | 8 +++
be/src/service/fe-support.cc | 48 ++++++++++++++
common/thrift/CatalogService.thrift | 35 ++++++++++
.../org/apache/impala/compat/MetastoreShim.java | 9 +++
.../org/apache/impala/compat/MetastoreShim.java | 77 ++++++++++++----------
.../impala/catalog/local/CatalogdMetaProvider.java | 67 ++++++++++++++++---
.../java/org/apache/impala/service/FeSupport.java | 42 ++++++++++++
.../java/org/apache/impala/service/JniCatalog.java | 32 +++++++++
.../catalog/local/CatalogdMetaProviderTest.java | 15 +++++
14 files changed, 382 insertions(+), 43 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 43f77e970..cc1667731 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -363,6 +363,34 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "UpdateTableUsage(): response.status=" << resp.status;
}
+ void GetNullPartitionName(TGetNullPartitionNameResponse& resp,
+ const TGetNullPartitionNameRequest& req) override {
+ VLOG_RPC << "GetNullPartitionName(): request=" << ThriftDebugString(req);
+ Status status = CheckProtocolVersion(req.protocol_version);
+ if (status.ok()) {
+ status = catalog_server_->catalog()->GetNullPartitionName(&resp);
+ }
+ if (!status.ok()) LOG(ERROR) << status.GetDetail();
+ TStatus thrift_status;
+ status.ToThrift(&thrift_status);
+ resp.__set_status(thrift_status);
+ VLOG_RPC << "GetNullPartitionName(): response=" << ThriftDebugStringNoThrow(resp);
+ }
+
+ void GetLatestCompactions(TGetLatestCompactionsResponse& resp,
+ const TGetLatestCompactionsRequest& req) override {
+ VLOG_RPC << "GetLatestCompactions(): request=" << ThriftDebugString(req);
+ Status status = CheckProtocolVersion(req.protocol_version);
+ if (status.ok()) {
+ status = catalog_server_->catalog()->GetLatestCompactions(req, &resp);
+ }
+ if (!status.ok()) LOG(ERROR) << status.GetDetail();
+ TStatus thrift_status;
+ status.ToThrift(&thrift_status);
+ resp.__set_status(thrift_status);
+ VLOG_RPC << "GetLatestCompactions(): response=" << ThriftDebugStringNoThrow(resp);
+ }
+
private:
CatalogServer* catalog_server_;
diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h
index 294960771..6775e4594 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -110,6 +110,22 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
*send_done = true;
recv_UpdateTableUsage(_return);
}
+
+ void GetNullPartitionName(TGetNullPartitionNameResponse& _return,
+ const TGetNullPartitionNameRequest& req, bool* send_done) {
+ DCHECK(!*send_done);
+ send_GetNullPartitionName(req);
+ *send_done = true;
+ recv_GetNullPartitionName(_return);
+ }
+
+ void GetLatestCompactions(TGetLatestCompactionsResponse& _return,
+ const TGetLatestCompactionsRequest& req, bool* send_done) {
+ DCHECK(!*send_done);
+ send_GetLatestCompactions(req);
+ *send_done = true;
+ recv_GetLatestCompactions(_return);
+ }
#pragma clang diagnostic pop
};
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 698091787..92b306b51 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -71,6 +71,8 @@ Catalog::Catalog() {
{"updateTableUsage", "([B)V", &update_table_usage_id_},
{"regenerateServiceId", "()V", ®enerate_service_id_},
{"refreshDataSources", "()V", &refresh_data_sources_},
+ {"getNullPartitionName", "()[B", &get_null_partition_name_id_},
+ {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_},
};
JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -220,3 +222,12 @@ void Catalog::RegenerateServiceId() {
Status Catalog::RefreshDataSources() {
return JniUtil::CallJniMethod(catalog_, refresh_data_sources_);
}
+
+Status Catalog::GetNullPartitionName(TGetNullPartitionNameResponse* resp) {
+ return JniUtil::CallJniMethod(catalog_, get_null_partition_name_id_, resp);
+}
+
+Status Catalog::GetLatestCompactions(
+ const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp) {
+ return JniUtil::CallJniMethod(catalog_, get_latest_compactions_id_, req, resp);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index c94f09a6b..f102a2908 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -137,6 +137,13 @@ class Catalog {
/// report.
Status UpdateTableUsage(const TUpdateTableUsageRequest& req);
+ /// Gets the null partition name.
+ Status GetNullPartitionName(TGetNullPartitionNameResponse* resp);
+
+ /// Gets the latest compactions for the request.
+ Status GetLatestCompactions(
+ const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp);
+
/// Regenerate Catalog Service ID.
/// The function should be called when the CatalogD becomes active.
void RegenerateServiceId();
@@ -170,6 +177,8 @@ class Catalog {
jmethodID update_table_usage_id_;
jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources()
+ jmethodID get_null_partition_name_id_; // JniCatalog.getNullPartitionName()
+ jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions()
};
}
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index a7fb2381b..074d828e3 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -422,3 +422,31 @@ Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
RETURN_IF_ERROR(rpc_status.status);
return Status::OK();
}
+
+Status CatalogOpExecutor::GetNullPartitionName(
+ const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result) {
+ int attempt = 0; // Used for debug action only.
+ CatalogServiceConnection::RpcStatus rpc_status =
+ CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+ &CatalogServiceClientWrapper::GetNullPartitionName, req,
+ FLAGS_catalog_client_connection_num_retries,
+ FLAGS_catalog_client_rpc_retry_interval_ms,
+ [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+ RETURN_IF_ERROR(rpc_status.status);
+ return Status::OK();
+}
+
+Status CatalogOpExecutor::GetLatestCompactions(
+ const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result) {
+ int attempt = 0; // Used for debug action only.
+ CatalogServiceConnection::RpcStatus rpc_status =
+ CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+ &CatalogServiceClientWrapper::GetLatestCompactions, req,
+ FLAGS_catalog_client_connection_num_retries,
+ FLAGS_catalog_client_rpc_retry_interval_ms,
+ [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+ RETURN_IF_ERROR(rpc_status.status);
+ return Status::OK();
+}
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index cab823bc6..12bf2e0f1 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -80,6 +80,14 @@ class CatalogOpExecutor {
Status UpdateTableUsage(const TUpdateTableUsageRequest& req,
TUpdateTableUsageResponse* resp);
+ /// Makes an RPC to the catalog server to get the null partition name.
+ Status GetNullPartitionName(
+ const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result);
+
+ /// Makes an RPC to the catalog server to get the latest compactions.
+ Status GetLatestCompactions(
+ const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result);
+
/// Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution.
/// If called before Exec(), this will return NULL. Only set if the
/// TCatalogOpType is DDL.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index a31979ce3..f4bee52e6 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -695,6 +695,46 @@ Java_org_apache_impala_service_FeSupport_nativeParseDateString(JNIEnv* env,
return result_bytes;
}
+// Native method to make a request to catalog server to get the null partition name.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName(
+ JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+ TGetNullPartitionNameRequest request;
+ THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+ JniUtil::internal_exc_class(), nullptr);
+ CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+ TGetNullPartitionNameResponse result;
+ Status status = catalog_op_executor.GetNullPartitionName(request, &result);
+ if (!status.ok()) {
+ LOG(ERROR) << status.GetDetail();
+ status.ToThrift(&result.status);
+ }
+ jbyteArray result_bytes = nullptr;
+ THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+ JniUtil::internal_exc_class(), result_bytes);
+ return result_bytes;
+}
+
+// Native method to make a request to catalog server to get the latest compactions.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions(
+ JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+ TGetLatestCompactionsRequest request;
+ THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+ JniUtil::internal_exc_class(), nullptr);
+ CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+ TGetLatestCompactionsResponse result;
+ Status status = catalog_op_executor.GetLatestCompactions(request, &result);
+ if (!status.ok()) {
+ LOG(ERROR) << status.GetDetail();
+ status.ToThrift(&result.status);
+ }
+ jbyteArray result_bytes = nullptr;
+ THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+ JniUtil::internal_exc_class(), result_bytes);
+ return result_bytes;
+}
+
namespace impala {
static JNINativeMethod native_methods[] = {
@@ -770,6 +810,14 @@ static JNINativeMethod native_methods[] = {
const_cast<char*>("(Ljava/lang/String;)[B"),
(void*)::Java_org_apache_impala_service_FeSupport_nativeParseDateString
},
+ {
+ const_cast<char*>("NativeGetNullPartitionName"), const_cast<char*>("([B)[B"),
+ (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName
+ },
+ {
+ const_cast<char*>("NativeGetLatestCompactions"), const_cast<char*>("([B)[B"),
+ (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions
+ },
};
void InitFeSupport(bool disable_codegen) {
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 99d210d06..701f10641 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -666,6 +666,35 @@ struct TGetPartitionStatsResponse {
2: optional map<string, binary> partition_stats
}
+// Request null partition name.
+struct TGetNullPartitionNameRequest {
+ 1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+}
+
+// Response for null partition name request.
+struct TGetNullPartitionNameResponse {
+ 1: required Status.TStatus status
+ // Null partition name.
+ 2: required string partition_value
+}
+
+// Request latest compactions.
+struct TGetLatestCompactionsRequest {
+ 1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+ 2: required string db_name
+ 3: required string table_name
+ 4: required string non_parition_name
+ 5: optional list<string> partition_names
+ 6: required i64 last_compaction_id
+}
+
+// Response for latest compactions request.
+struct TGetLatestCompactionsResponse {
+ 1: required Status.TStatus status
+ // Map of partition name to the compaction id
+ 2: required map<string, i64> partition_to_compaction_id
+}
+
// Instructs the Catalog Server to prioritizing loading of metadata for the specified
// catalog objects. Currently only used for controlling the priority of loading
// tables/views since Db/Function metadata is loaded on startup.
@@ -736,4 +765,10 @@ service CatalogService {
// Update recently used tables and their usage counts in an impalad since the last
// report.
TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);
+
+ // Gets the null partition name used at HMS.
+ TGetNullPartitionNameResponse GetNullPartitionName(1: TGetNullPartitionNameRequest req);
+
+ // Gets the latest compactions.
+ TGetLatestCompactionsResponse GetLatestCompactions(1: TGetLatestCompactionsRequest req);
}
diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 038f6eb1e..21f0a7266 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -589,6 +589,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
"getPartitionsForRefreshingFileMetadata is not supported.");
}
+ /**
+ * CDP Hive-3 only function.
+ */
+ public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+ String dbName, String tableName, List<String> partitionNames,
+ String unPartitionedName, long lastCompactionId) throws TException {
+ throw new UnsupportedOperationException("getLatestCompactions is not supported.");
+ }
+
/**
* CDP Hive-3 only function.
*/
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d3f33e8cb..a1ea42fe8 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -743,39 +743,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
}
/**
- * Fetches the latest compaction id from HMS and compares with partition metadata in
- * cache. If a partition is stale due to compaction, removes it from metas.
+ * Fetches the latest compactions from HMS
*/
- public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
- String dbName, String tableName, TableMetaRef table,
- Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
- throws TException {
- Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
- Preconditions.checkNotNull(metas, "Partition map must be non-null");
- if (metas.isEmpty()) {
- return Collections.emptyList();
- }
- Stopwatch sw = Stopwatch.createStarted();
- List<PartitionRef> stalePartitions = new ArrayList<>();
- if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
+ public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+ String dbName, String tableName, List<String> partitionNames,
+ String unPartitionedName, long lastCompactionId) throws TException {
GetLatestCommittedCompactionInfoRequest request =
new GetLatestCommittedCompactionInfoRequest(dbName, tableName);
- if (table.isPartitioned()) {
- request.setPartitionnames(metas.keySet().stream()
- .map(PartitionRef::getName).collect(Collectors.toList()));
- }
- long lastCompactionId = metas.values().stream()
- .mapToLong(p -> p.getLastCompactionId()).max().orElse(-1);
- if (lastCompactionId > 0) {
- request.setLastCompactionId(lastCompactionId);
- }
-
+ request.setPartitionnames(partitionNames);
+ if (lastCompactionId > 0) request.setLastCompactionId(lastCompactionId);
GetLatestCommittedCompactionInfoResponse response;
- try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
- response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
- }
+ response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
Map<String, Long> partNameToCompactionId = new HashMap<>();
- if (table.isPartitioned()) {
+ if (partitionNames != null) {
for (CompactionInfoStruct ci : response.getCompactions()) {
if (ci.getPartitionname() != null) {
partNameToCompactionId.put(ci.getPartitionname(), ci.getId());
@@ -786,14 +766,41 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
}
}
} else {
- CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(),
- null);
- if (ci != null) {
- partNameToCompactionId.put(unPartitionedName, ci.getId());
- }
+ CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), null);
+ if (ci != null) partNameToCompactionId.put(unPartitionedName, ci.getId());
+ }
+ return partNameToCompactionId;
+ }
+
+ /**
+ * Fetches the latest compaction id from HMS and compares with partition metadata in
+ * cache. If a partition is stale due to compaction, removes it from metas.
+ */
+ public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
+ String dbName, String tableName, TableMetaRef table,
+ Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
+ throws TException {
+ Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+ Preconditions.checkNotNull(metas, "Partition map must be non-null");
+ if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+ Stopwatch sw = Stopwatch.createStarted();
+ List<String> partitionNames = null;
+ if (table.isPartitioned()) {
+ partitionNames =
+ metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
}
- Iterator<Entry<PartitionRef, PartitionMetadata>> iter =
- metas.entrySet().iterator();
+ long lastCompactionId = metas.values()
+ .stream()
+ .mapToLong(PartitionMetadata::getLastCompactionId)
+ .max()
+ .orElse(-1);
+ Map<String, Long> partNameToCompactionId = Collections.emptyMap();
+ try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
+ partNameToCompactionId = getLatestCompactions(
+ client, dbName, tableName, partitionNames, unPartitionedName, lastCompactionId);
+ }
+ List<PartitionRef> stalePartitions = new ArrayList<>();
+ Iterator<Entry<PartitionRef, PartitionMetadata>> iter = metas.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index cbfbfbf96..2a7b611b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -20,8 +20,10 @@ package org.apache.impala.catalog.local;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,6 +65,7 @@ import org.apache.impala.catalog.HdfsCachePool;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
import org.apache.impala.catalog.Principal;
import org.apache.impala.catalog.PrincipalPrivilege;
@@ -71,6 +74,7 @@ import org.apache.impala.catalog.VirtualColumn;
import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
+import org.apache.impala.common.PrintUtils;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.service.FrontendProfile;
@@ -88,6 +92,8 @@ import org.apache.impala.thrift.TDbInfoSelector;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TFunctionName;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
import org.apache.impala.thrift.THdfsFileDesc;
@@ -117,6 +123,7 @@ import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -261,11 +268,6 @@ public class CatalogdMetaProvider implements MetaProvider {
private final ListMap<TNetworkAddress> cacheHostIndex_ =
new ListMap<TNetworkAddress>();
- // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
- // of metadata. In order to incrementally build this out, we delegate various calls
- // to the "direct" provider for now and circumvent catalogd.
- private DirectMetaProvider directProvider_ = new DirectMetaProvider();
-
/**
* Number of requests which piggy-backed on a concurrent request for the same key,
* and resulted in success. Used only for test assertions.
@@ -948,8 +950,8 @@ public class CatalogdMetaProvider implements MetaProvider {
hostIndex, partitionRefs);
if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
// If any partitions are stale after compaction, they will be removed from refToMeta
- List<PartitionRef> stalePartitions = directProvider_.checkLatestCompaction(
- refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
+ List<PartitionRef> stalePartitions =
+ checkLatestCompaction(refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
cache_.invalidateAll(stalePartitions.stream()
.map(PartitionRefImpl.class::cast)
.map(PartitionRefImpl::getId)
@@ -988,6 +990,55 @@ public class CatalogdMetaProvider implements MetaProvider {
return nameToMeta;
}
+ /**
+ * Fetches the latest compactions from catalogd and compares with partition metadata in
+ * cache. If a partition is stale due to compaction, removes it from metas. And return
+ * the stale partitions.
+ */
+ private List<PartitionRef> checkLatestCompaction(String dbName, String tableName,
+ TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
+ Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+ Preconditions.checkNotNull(metas, "Partition map must be non-null");
+ if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+ Stopwatch sw = Stopwatch.createStarted();
+ TGetLatestCompactionsRequest req = new TGetLatestCompactionsRequest();
+ req.db_name = dbName;
+ req.table_name = tableName;
+ req.non_parition_name = HdfsTable.DEFAULT_PARTITION_NAME;
+ if (table.isPartitioned()) {
+ req.partition_names =
+ metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
+ }
+ req.last_compaction_id = metas.values()
+ .stream()
+ .mapToLong(PartitionMetadata::getLastCompactionId)
+ .max()
+ .orElse(-1);
+ byte[] ret = FeSupport.GetLatestCompactions(
+ new TSerializer(new TBinaryProtocol.Factory()).serialize(req));
+ TGetLatestCompactionsResponse resp = new TGetLatestCompactionsResponse();
+ new TDeserializer(new TBinaryProtocol.Factory()).deserialize(resp, ret);
+ if (resp.status.status_code != TErrorCode.OK) {
+ throw new TException(Joiner.on("\n").join(resp.status.getError_msgs()));
+ }
+ Map<String, Long> partNameToCompactionId = resp.partition_to_compaction_id;
+ Preconditions.checkNotNull(
+ partNameToCompactionId, "Partition name to compaction id map must be non-null");
+ List<PartitionRef> stalePartitions = new ArrayList<>();
+ Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter =
+ metas.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
+ if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
+ stalePartitions.add(entry.getKey());
+ iter.remove();
+ }
+ }
+ LOG.info("Checked the latest compaction info for {}.{}. Time taken: {}", dbName,
+ tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
+ return stalePartitions;
+ }
+
/**
* Load the specified partitions 'prefs' from catalogd. The partitions are made
* relative to the given 'hostIndex' before being returned.
@@ -1189,7 +1240,7 @@ public class CatalogdMetaProvider implements MetaProvider {
/** Called to load cache for cache misses */
@Override
public String call() throws Exception {
- return directProvider_.loadNullPartitionKeyValue();
+ return FeSupport.GetNullPartitionName();
}
});
}
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index e8ea8430a..4b56ca15c 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -36,6 +36,8 @@ import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TExprBatch;
+import org.apache.impala.thrift.TGetNullPartitionNameRequest;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
import org.apache.impala.thrift.TGetPartitionStatsRequest;
import org.apache.impala.thrift.TGetPartitionStatsResponse;
import org.apache.impala.thrift.TParseDateStringResult;
@@ -140,6 +142,12 @@ public class FeSupport {
// E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'.
public native static byte[] nativeParseDateString(String date);
+ // Does an RPC to the Catalog Server to get the null partition name.
+ public native static byte[] NativeGetNullPartitionName(byte[] thriftReq);
+
+ // Does an RPC to the Catalog Server to get the latest compactions.
+ public native static byte[] NativeGetLatestCompactions(byte[] thriftReq);
+
/**
* Locally caches the jar at the specified HDFS location.
*
@@ -490,6 +498,40 @@ public class FeSupport {
}
}
+ private static byte[] GetNullPartitionName(byte[] thriftReq) {
+ try {
+ return NativeGetNullPartitionName(thriftReq);
+ } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+ return NativeGetNullPartitionName(thriftReq);
+ }
+
+ public static String GetNullPartitionName() throws InternalException {
+ TGetNullPartitionNameRequest request = new TGetNullPartitionNameRequest();
+ TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+ try {
+ byte[] result = GetNullPartitionName(
+ new TSerializer(new TBinaryProtocol.Factory()).serialize(request));
+ Preconditions.checkNotNull(result);
+ new TDeserializer(new TBinaryProtocol.Factory()).deserialize(response, result);
+ if (response.getStatus().getStatus_code() != TErrorCode.OK) {
+ throw new InternalException("Error requesting GetNullPartitionName: "
+ + Joiner.on("\n").join(response.getStatus().getError_msgs()));
+ }
+ Preconditions.checkNotNull(response.partition_value);
+ return response.partition_value;
+ } catch (TException e) {
+ // this should never happen
+ throw new InternalException("Error processing request: " + e.getMessage(), e);
+ }
+ }
+
+ public static byte[] GetLatestCompactions(byte[] thriftReq) {
+ try {
+ return NativeGetLatestCompactions(thriftReq);
+ } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+ return NativeGetLatestCompactions(thriftReq);
+ }
+
/**
* Calling this function before loadLibrary() causes external frontend
* initialization to be used during NativeFeInit()
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 1adfd7efa..c25faabaa 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -68,6 +68,9 @@ import org.apache.impala.thrift.TGetDbsParams;
import org.apache.impala.thrift.TGetDbsResult;
import org.apache.impala.thrift.TGetFunctionsRequest;
import org.apache.impala.thrift.TGetFunctionsResponse;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartitionStatsRequest;
import org.apache.impala.thrift.TGetPartitionStatsResponse;
@@ -85,6 +88,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.AuthorizationUtil;
import org.apache.impala.util.CatalogOpUtil;
import org.apache.impala.util.GlogAppender;
+import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.impala.util.TUniqueIdUtil;
@@ -562,4 +566,32 @@ public class JniCatalog {
public void refreshDataSources() throws TException {
catalog_.refreshDataSources();
}
+
+ public byte[] getNullPartitionName() throws ImpalaException, TException {
+ return execAndSerialize("getNullPartitionName", "Getting null partition name", () -> {
+ TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ response.setPartition_value(
+ MetaStoreUtil.getNullPartitionKeyValue(msClient.getHiveClient()));
+ response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+ }
+ return response;
+ });
+ }
+
+ public byte[] getLatestCompactions(byte[] thriftParams)
+ throws ImpalaException, TException {
+ TGetLatestCompactionsRequest request = new TGetLatestCompactionsRequest();
+ JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
+ return execAndSerialize("getLatestCompactions", "Getting latest compactions", () -> {
+ TGetLatestCompactionsResponse response = new TGetLatestCompactionsResponse();
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ response.setPartition_to_compaction_id(MetastoreShim.getLatestCompactions(
+ msClient, request.db_name, request.table_name, request.partition_names,
+ request.non_parition_name, request.last_compaction_id));
+ response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+ }
+ return response;
+ });
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index d0cc54deb..57daa7765 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -611,6 +611,21 @@ public class CatalogdMetaProviderTest {
}
}
+ @Test
+ public void testLoadNullPartitionKeyValue() throws Exception {
+ provider_.cache_.invalidateAll();
+ CacheStats stats = diffStats();
+ String nullPartitionName = provider_.loadNullPartitionKeyValue();
+ assertNotNull(nullPartitionName);
+ stats = diffStats();
+ assertEquals(1, stats.missCount());
+ assertEquals(0, stats.hitCount());
+ assertEquals(nullPartitionName, provider_.loadNullPartitionKeyValue());
+ stats = diffStats();
+ assertEquals(0, stats.missCount());
+ assertEquals(1, stats.hitCount());
+ }
+
private void testFileMetadataAfterCompaction(String dbName, String tableName,
String partition, boolean isMajorCompaction) throws Exception {
String tableOrPartition = dbName + "." + tableName + " " + partition;