You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 19:39:55 UTC

(impala) branch master updated (9f01c9bef -> dd8ddf77c)

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 9f01c9bef IMPALA-12673: Table migration fails if partition contains '/'
     new 142ae261e IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider
     new 324a1aa37 IMPALA-11553: Add event specific metrics in the table metrics
     new dd8ddf77c IMPALA-12668: Enable clang-tidy checks for implicit fallthrough

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .clang-tidy                                        |  1 -
 be/src/benchmarks/atoi-benchmark.cc                | 19 +++++-
 be/src/catalog/catalog-server.cc                   | 28 ++++++++
 be/src/catalog/catalog-service-client-wrapper.h    | 16 +++++
 be/src/catalog/catalog.cc                          | 11 ++++
 be/src/catalog/catalog.h                           |  9 +++
 be/src/codegen/codegen-anyval-read-write-info.cc   |  1 +
 be/src/codegen/codegen-anyval.cc                   |  1 +
 be/src/exec/catalog-op-executor.cc                 | 28 ++++++++
 be/src/exec/catalog-op-executor.h                  |  8 +++
 be/src/exec/orc/hdfs-orc-scanner.cc                |  1 +
 be/src/exec/parquet/parquet-column-stats.cc        |  1 +
 be/src/exprs/agg-fn-evaluator.cc                   |  4 +-
 be/src/exprs/anyval-util.h                         |  3 +-
 be/src/runtime/datetime-parser-common.cc           |  1 +
 be/src/runtime/descriptors.cc                      |  2 +
 be/src/runtime/io/disk-file.cc                     |  2 +
 be/src/runtime/raw-value.cc                        |  1 +
 be/src/runtime/raw-value.inline.h                  |  3 +-
 be/src/runtime/runtime-filter-ir.cc                |  2 +
 be/src/runtime/types.cc                            |  1 +
 be/src/service/fe-support.cc                       | 48 ++++++++++++++
 be/src/service/query-result-set.cc                 |  1 +
 be/src/util/bit-packing.inline.h                   | 19 ++++--
 be/src/util/hash-util.h                            | 56 +++++++++++-----
 be/src/util/jwt-util.cc                            |  3 +-
 be/src/util/redactor.cc                            |  1 +
 be/src/util/string-parser.h                        |  7 +-
 bin/run_clang_tidy.sh                              |  7 +-
 common/thrift/CatalogService.thrift                | 35 ++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  9 +++
 .../org/apache/impala/compat/MetastoreShim.java    | 77 ++++++++++++----------
 .../main/java/org/apache/impala/catalog/Table.java | 10 +++
 .../impala/catalog/events/MetastoreEvents.java     | 73 +++++++++++++-------
 .../catalog/events/MetastoreEventsProcessor.java   |  9 +++
 .../impala/catalog/local/CatalogdMetaProvider.java | 67 ++++++++++++++++---
 .../java/org/apache/impala/service/FeSupport.java  | 42 ++++++++++++
 .../java/org/apache/impala/service/JniCatalog.java | 32 +++++++++
 .../catalog/local/CatalogdMetaProviderTest.java    | 15 +++++
 tests/custom_cluster/test_events_custom_configs.py |  4 +-
 tests/webserver/test_web_pages.py                  |  4 ++
 41 files changed, 563 insertions(+), 99 deletions(-)


(impala) 01/03: IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 142ae261e319c3b602128deb1329713d5253637d
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Dec 14 12:45:34 2023 +0530

    IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider
    
    This commit removes the CatalogMetaProvider dependence on
    DirectMetaProvider. CatalogMetaProvider depends on DirectMetaProvider
    for 2 APIs. Implemented the APIs on catalog server and used them
    instead. DirectMetaProvider is not referenced anywhere now. But it is
    retained for future use.
    
    Testing:
    - Manually tested and CatalogdMetaProviderTest covers the tests.
    
    Change-Id: I096c1b1d1a52e979c8b2d8173dae9ca2cc6c36d2
    Reviewed-on: http://gerrit.cloudera.org:8080/20791
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 28 ++++++++
 be/src/catalog/catalog-service-client-wrapper.h    | 16 +++++
 be/src/catalog/catalog.cc                          | 11 ++++
 be/src/catalog/catalog.h                           |  9 +++
 be/src/exec/catalog-op-executor.cc                 | 28 ++++++++
 be/src/exec/catalog-op-executor.h                  |  8 +++
 be/src/service/fe-support.cc                       | 48 ++++++++++++++
 common/thrift/CatalogService.thrift                | 35 ++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  9 +++
 .../org/apache/impala/compat/MetastoreShim.java    | 77 ++++++++++++----------
 .../impala/catalog/local/CatalogdMetaProvider.java | 67 ++++++++++++++++---
 .../java/org/apache/impala/service/FeSupport.java  | 42 ++++++++++++
 .../java/org/apache/impala/service/JniCatalog.java | 32 +++++++++
 .../catalog/local/CatalogdMetaProviderTest.java    | 15 +++++
 14 files changed, 382 insertions(+), 43 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 43f77e970..cc1667731 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -363,6 +363,34 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     VLOG_RPC << "UpdateTableUsage(): response.status=" << resp.status;
   }
 
+  void GetNullPartitionName(TGetNullPartitionNameResponse& resp,
+      const TGetNullPartitionNameRequest& req) override {
+    VLOG_RPC << "GetNullPartitionName(): request=" << ThriftDebugString(req);
+    Status status = CheckProtocolVersion(req.protocol_version);
+    if (status.ok()) {
+      status = catalog_server_->catalog()->GetNullPartitionName(&resp);
+    }
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetNullPartitionName(): response=" << ThriftDebugStringNoThrow(resp);
+  }
+
+  void GetLatestCompactions(TGetLatestCompactionsResponse& resp,
+      const TGetLatestCompactionsRequest& req) override {
+    VLOG_RPC << "GetLatestCompactions(): request=" << ThriftDebugString(req);
+    Status status = CheckProtocolVersion(req.protocol_version);
+    if (status.ok()) {
+      status = catalog_server_->catalog()->GetLatestCompactions(req, &resp);
+    }
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetLatestCompactions(): response=" << ThriftDebugStringNoThrow(resp);
+  }
+
  private:
   CatalogServer* catalog_server_;
 
diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h
index 294960771..6775e4594 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -110,6 +110,22 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
     *send_done = true;
     recv_UpdateTableUsage(_return);
   }
+
+  void GetNullPartitionName(TGetNullPartitionNameResponse& _return,
+      const TGetNullPartitionNameRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetNullPartitionName(req);
+    *send_done = true;
+    recv_GetNullPartitionName(_return);
+  }
+
+  void GetLatestCompactions(TGetLatestCompactionsResponse& _return,
+      const TGetLatestCompactionsRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetLatestCompactions(req);
+    *send_done = true;
+    recv_GetLatestCompactions(_return);
+  }
 #pragma clang diagnostic pop
 };
 
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 698091787..92b306b51 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -71,6 +71,8 @@ Catalog::Catalog() {
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
     {"regenerateServiceId", "()V", &regenerate_service_id_},
     {"refreshDataSources", "()V", &refresh_data_sources_},
+    {"getNullPartitionName", "()[B", &get_null_partition_name_id_},
+    {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -220,3 +222,12 @@ void Catalog::RegenerateServiceId() {
 Status Catalog::RefreshDataSources() {
   return JniUtil::CallJniMethod(catalog_, refresh_data_sources_);
 }
+
+Status Catalog::GetNullPartitionName(TGetNullPartitionNameResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_null_partition_name_id_, resp);
+}
+
+Status Catalog::GetLatestCompactions(
+    const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_latest_compactions_id_, req, resp);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index c94f09a6b..f102a2908 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -137,6 +137,13 @@ class Catalog {
   /// report.
   Status UpdateTableUsage(const TUpdateTableUsageRequest& req);
 
+  /// Gets the null partition name.
+  Status GetNullPartitionName(TGetNullPartitionNameResponse* resp);
+
+  /// Gets the latest compactions for the request.
+  Status GetLatestCompactions(
+      const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp);
+
   /// Regenerate Catalog Service ID.
   /// The function should be called when the CatalogD becomes active.
   void RegenerateServiceId();
@@ -170,6 +177,8 @@ class Catalog {
   jmethodID update_table_usage_id_;
   jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
   jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources()
+  jmethodID get_null_partition_name_id_; // JniCatalog.getNullPartitionName()
+  jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions()
 };
 
 }
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index a7fb2381b..074d828e3 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -422,3 +422,31 @@ Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
   RETURN_IF_ERROR(rpc_status.status);
   return Status::OK();
 }
+
+Status CatalogOpExecutor::GetNullPartitionName(
+    const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result) {
+  int attempt = 0; // Used for debug action only.
+  CatalogServiceConnection::RpcStatus rpc_status =
+      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+          *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+          &CatalogServiceClientWrapper::GetNullPartitionName, req,
+          FLAGS_catalog_client_connection_num_retries,
+          FLAGS_catalog_client_rpc_retry_interval_ms,
+          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+  RETURN_IF_ERROR(rpc_status.status);
+  return Status::OK();
+}
+
+Status CatalogOpExecutor::GetLatestCompactions(
+    const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result) {
+  int attempt = 0; // Used for debug action only.
+  CatalogServiceConnection::RpcStatus rpc_status =
+      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+          *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+          &CatalogServiceClientWrapper::GetLatestCompactions, req,
+          FLAGS_catalog_client_connection_num_retries,
+          FLAGS_catalog_client_rpc_retry_interval_ms,
+          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
+  RETURN_IF_ERROR(rpc_status.status);
+  return Status::OK();
+}
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index cab823bc6..12bf2e0f1 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -80,6 +80,14 @@ class CatalogOpExecutor {
   Status UpdateTableUsage(const TUpdateTableUsageRequest& req,
       TUpdateTableUsageResponse* resp);
 
+  /// Makes an RPC to the catalog server to get the null partition name.
+  Status GetNullPartitionName(
+      const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result);
+
+  /// Makes an RPC to the catalog server to get the latest compactions.
+  Status GetLatestCompactions(
+      const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result);
+
   /// Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution.
   /// If called before Exec(), this will return NULL. Only set if the
   /// TCatalogOpType is DDL.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index a31979ce3..f4bee52e6 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -695,6 +695,46 @@ Java_org_apache_impala_service_FeSupport_nativeParseDateString(JNIEnv* env,
   return result_bytes;
 }
 
+// Native method to make a request to catalog server to get the null partition name.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName(
+    JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+  TGetNullPartitionNameRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+  TGetNullPartitionNameResponse result;
+  Status status = catalog_op_executor.GetNullPartitionName(request, &result);
+  if (!status.ok()) {
+    LOG(ERROR) << status.GetDetail();
+    status.ToThrift(&result.status);
+  }
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
+// Native method to make a request to catalog server to get the latest compactions.
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions(
+    JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) {
+  TGetLatestCompactionsRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+  TGetLatestCompactionsResponse result;
+  Status status = catalog_op_executor.GetLatestCompactions(request, &result);
+  if (!status.ok()) {
+    LOG(ERROR) << status.GetDetail();
+    status.ToThrift(&result.status);
+  }
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -770,6 +810,14 @@ static JNINativeMethod native_methods[] = {
     const_cast<char*>("(Ljava/lang/String;)[B"),
     (void*)::Java_org_apache_impala_service_FeSupport_nativeParseDateString
   },
+  {
+    const_cast<char*>("NativeGetNullPartitionName"), const_cast<char*>("([B)[B"),
+    (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName
+  },
+  {
+    const_cast<char*>("NativeGetLatestCompactions"), const_cast<char*>("([B)[B"),
+    (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 99d210d06..701f10641 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -666,6 +666,35 @@ struct TGetPartitionStatsResponse {
   2: optional map<string, binary> partition_stats
 }
 
+// Request null partition name.
+struct TGetNullPartitionNameRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+}
+
+// Response for null partition name request.
+struct TGetNullPartitionNameResponse {
+  1: required Status.TStatus status
+  // Null partition name.
+  2: required string partition_value
+}
+
+// Request latest compactions.
+struct TGetLatestCompactionsRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+  2: required string db_name
+  3: required string table_name
+  4: required string non_parition_name
+  5: optional list<string> partition_names
+  6: required i64 last_compaction_id
+}
+
+// Response for latest compactions request.
+struct TGetLatestCompactionsResponse {
+  1: required Status.TStatus status
+  // Map of partition name to the compaction id
+  2: required map<string, i64> partition_to_compaction_id
+}
+
 // Instructs the Catalog Server to prioritizing loading of metadata for the specified
 // catalog objects. Currently only used for controlling the priority of loading
 // tables/views since Db/Function metadata is loaded on startup.
@@ -736,4 +765,10 @@ service CatalogService {
   // Update recently used tables and their usage counts in an impalad since the last
   // report.
   TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);
+
+  // Gets the null partition name used at HMS.
+  TGetNullPartitionNameResponse GetNullPartitionName(1: TGetNullPartitionNameRequest req);
+
+  // Gets the latest compactions.
+  TGetLatestCompactionsResponse GetLatestCompactions(1: TGetLatestCompactionsRequest req);
 }
diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 038f6eb1e..21f0a7266 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -589,6 +589,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         "getPartitionsForRefreshingFileMetadata is not supported.");
   }
 
+  /**
+   * CDP Hive-3 only function.
+   */
+  public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+      String dbName, String tableName, List<String> partitionNames,
+      String unPartitionedName, long lastCompactionId) throws TException {
+    throw new UnsupportedOperationException("getLatestCompactions is not supported.");
+  }
+
   /**
    * CDP Hive-3 only function.
    */
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d3f33e8cb..a1ea42fe8 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -743,39 +743,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
   }
 
   /**
-   * Fetches the latest compaction id from HMS and compares with partition metadata in
-   * cache. If a partition is stale due to compaction, removes it from metas.
+   * Fetches the latest compactions from HMS
    */
-  public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
-      String dbName, String tableName, TableMetaRef table,
-      Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
-      throws TException {
-    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
-    Preconditions.checkNotNull(metas, "Partition map must be non-null");
-    if (metas.isEmpty()) {
-      return Collections.emptyList();
-    }
-    Stopwatch sw = Stopwatch.createStarted();
-    List<PartitionRef> stalePartitions = new ArrayList<>();
-    if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
+  public static Map<String, Long> getLatestCompactions(MetaStoreClient client,
+      String dbName, String tableName, List<String> partitionNames,
+      String unPartitionedName, long lastCompactionId) throws TException {
     GetLatestCommittedCompactionInfoRequest request =
         new GetLatestCommittedCompactionInfoRequest(dbName, tableName);
-    if (table.isPartitioned()) {
-      request.setPartitionnames(metas.keySet().stream()
-          .map(PartitionRef::getName).collect(Collectors.toList()));
-    }
-    long lastCompactionId = metas.values().stream()
-        .mapToLong(p -> p.getLastCompactionId()).max().orElse(-1);
-    if (lastCompactionId > 0) {
-      request.setLastCompactionId(lastCompactionId);
-    }
-
+    request.setPartitionnames(partitionNames);
+    if (lastCompactionId > 0) request.setLastCompactionId(lastCompactionId);
     GetLatestCommittedCompactionInfoResponse response;
-    try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
-      response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
-    }
+    response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
     Map<String, Long> partNameToCompactionId = new HashMap<>();
-    if (table.isPartitioned()) {
+    if (partitionNames != null) {
       for (CompactionInfoStruct ci : response.getCompactions()) {
         if (ci.getPartitionname() != null) {
           partNameToCompactionId.put(ci.getPartitionname(), ci.getId());
@@ -786,14 +766,41 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         }
       }
     } else {
-      CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(),
-          null);
-      if (ci != null) {
-        partNameToCompactionId.put(unPartitionedName, ci.getId());
-      }
+      CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), null);
+      if (ci != null) partNameToCompactionId.put(unPartitionedName, ci.getId());
+    }
+    return partNameToCompactionId;
+  }
+
+  /**
+   * Fetches the latest compaction id from HMS and compares with partition metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas.
+   */
+  public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
+      String dbName, String tableName, TableMetaRef table,
+      Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
+      throws TException {
+    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+    Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+    Stopwatch sw = Stopwatch.createStarted();
+    List<String> partitionNames = null;
+    if (table.isPartitioned()) {
+      partitionNames =
+          metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
     }
-    Iterator<Entry<PartitionRef, PartitionMetadata>> iter =
-        metas.entrySet().iterator();
+    long lastCompactionId = metas.values()
+                                .stream()
+                                .mapToLong(PartitionMetadata::getLastCompactionId)
+                                .max()
+                                .orElse(-1);
+    Map<String, Long> partNameToCompactionId = Collections.emptyMap();
+    try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) {
+      partNameToCompactionId = getLatestCompactions(
+          client, dbName, tableName, partitionNames, unPartitionedName, lastCompactionId);
+    }
+    List<PartitionRef> stalePartitions = new ArrayList<>();
+    Iterator<Entry<PartitionRef, PartitionMetadata>> iter = metas.entrySet().iterator();
     while (iter.hasNext()) {
       Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
       if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index cbfbfbf96..2a7b611b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -20,8 +20,10 @@ package org.apache.impala.catalog.local;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,6 +65,7 @@ import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
@@ -71,6 +74,7 @@ import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.FrontendProfile;
@@ -88,6 +92,8 @@ import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionName;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsFileDesc;
@@ -117,6 +123,7 @@ import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -261,11 +268,6 @@ public class CatalogdMetaProvider implements MetaProvider {
   private final ListMap<TNetworkAddress> cacheHostIndex_ =
       new ListMap<TNetworkAddress>();
 
-  // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
-  // of metadata. In order to incrementally build this out, we delegate various calls
-  // to the "direct" provider for now and circumvent catalogd.
-  private DirectMetaProvider directProvider_ = new DirectMetaProvider();
-
   /**
    * Number of requests which piggy-backed on a concurrent request for the same key,
    * and resulted in success. Used only for test assertions.
@@ -948,8 +950,8 @@ public class CatalogdMetaProvider implements MetaProvider {
         hostIndex, partitionRefs);
     if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
       // If any partitions are stale after compaction, they will be removed from refToMeta
-      List<PartitionRef> stalePartitions = directProvider_.checkLatestCompaction(
-          refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
+      List<PartitionRef> stalePartitions =
+          checkLatestCompaction(refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta);
       cache_.invalidateAll(stalePartitions.stream()
           .map(PartitionRefImpl.class::cast)
           .map(PartitionRefImpl::getId)
@@ -988,6 +990,55 @@ public class CatalogdMetaProvider implements MetaProvider {
     return nameToMeta;
   }
 
+  /**
+   * Fetches the latest compactions from catalogd and compares with partition metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas. And return
+   * the stale partitions.
+   */
+  private List<PartitionRef> checkLatestCompaction(String dbName, String tableName,
+      TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
+    Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
+    Preconditions.checkNotNull(metas, "Partition map must be non-null");
+    if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList();
+    Stopwatch sw = Stopwatch.createStarted();
+    TGetLatestCompactionsRequest req = new TGetLatestCompactionsRequest();
+    req.db_name = dbName;
+    req.table_name = tableName;
+    req.non_parition_name = HdfsTable.DEFAULT_PARTITION_NAME;
+    if (table.isPartitioned()) {
+      req.partition_names =
+          metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList());
+    }
+    req.last_compaction_id = metas.values()
+                                 .stream()
+                                 .mapToLong(PartitionMetadata::getLastCompactionId)
+                                 .max()
+                                 .orElse(-1);
+    byte[] ret = FeSupport.GetLatestCompactions(
+        new TSerializer(new TBinaryProtocol.Factory()).serialize(req));
+    TGetLatestCompactionsResponse resp = new TGetLatestCompactionsResponse();
+    new TDeserializer(new TBinaryProtocol.Factory()).deserialize(resp, ret);
+    if (resp.status.status_code != TErrorCode.OK) {
+      throw new TException(Joiner.on("\n").join(resp.status.getError_msgs()));
+    }
+    Map<String, Long> partNameToCompactionId = resp.partition_to_compaction_id;
+    Preconditions.checkNotNull(
+        partNameToCompactionId, "Partition name to compaction id map must be non-null");
+    List<PartitionRef> stalePartitions = new ArrayList<>();
+    Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter =
+        metas.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
+      if (partNameToCompactionId.containsKey(entry.getKey().getName())) {
+        stalePartitions.add(entry.getKey());
+        iter.remove();
+      }
+    }
+    LOG.info("Checked the latest compaction info for {}.{}. Time taken: {}", dbName,
+        tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
+    return stalePartitions;
+  }
+
   /**
    * Load the specified partitions 'prefs' from catalogd. The partitions are made
    * relative to the given 'hostIndex' before being returned.
@@ -1189,7 +1240,7 @@ public class CatalogdMetaProvider implements MetaProvider {
           /** Called to load cache for cache misses */
           @Override
           public String call() throws Exception {
-            return directProvider_.loadNullPartitionKeyValue();
+            return FeSupport.GetNullPartitionName();
           }
         });
   }
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index e8ea8430a..4b56ca15c 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -36,6 +36,8 @@ import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TExprBatch;
+import org.apache.impala.thrift.TGetNullPartitionNameRequest;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TParseDateStringResult;
@@ -140,6 +142,12 @@ public class FeSupport {
   // E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'.
   public native static byte[] nativeParseDateString(String date);
 
+  // Does an RPC to the Catalog Server to get the null partition name.
+  public native static byte[] NativeGetNullPartitionName(byte[] thriftReq);
+
+  // Does an RPC to the Catalog Server to get the latest compactions.
+  public native static byte[] NativeGetLatestCompactions(byte[] thriftReq);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -490,6 +498,40 @@ public class FeSupport {
     }
   }
 
+  private static byte[] GetNullPartitionName(byte[] thriftReq) {
+    try {
+      return NativeGetNullPartitionName(thriftReq);
+    } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+    return NativeGetNullPartitionName(thriftReq);
+  }
+
+  public static String GetNullPartitionName() throws InternalException {
+    TGetNullPartitionNameRequest request = new TGetNullPartitionNameRequest();
+    TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+    try {
+      byte[] result = GetNullPartitionName(
+          new TSerializer(new TBinaryProtocol.Factory()).serialize(request));
+      Preconditions.checkNotNull(result);
+      new TDeserializer(new TBinaryProtocol.Factory()).deserialize(response, result);
+      if (response.getStatus().getStatus_code() != TErrorCode.OK) {
+        throw new InternalException("Error requesting GetNullPartitionName: "
+            + Joiner.on("\n").join(response.getStatus().getError_msgs()));
+      }
+      Preconditions.checkNotNull(response.partition_value);
+      return response.partition_value;
+    } catch (TException e) {
+      // this should never happen
+      throw new InternalException("Error processing request: " + e.getMessage(), e);
+    }
+  }
+
+  public static byte[] GetLatestCompactions(byte[] thriftReq) {
+    try {
+      return NativeGetLatestCompactions(thriftReq);
+    } catch (UnsatisfiedLinkError e) { loadLibrary(); }
+    return NativeGetLatestCompactions(thriftReq);
+  }
+
   /**
    * Calling this function before loadLibrary() causes external frontend
    * initialization to be used during NativeFeInit()
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 1adfd7efa..c25faabaa 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -68,6 +68,9 @@ import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
+import org.apache.impala.thrift.TGetLatestCompactionsRequest;
+import org.apache.impala.thrift.TGetLatestCompactionsResponse;
+import org.apache.impala.thrift.TGetNullPartitionNameResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
@@ -85,6 +88,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AuthorizationUtil;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.GlogAppender;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.impala.util.TUniqueIdUtil;
@@ -562,4 +566,32 @@ public class JniCatalog {
   public void refreshDataSources() throws TException {
     catalog_.refreshDataSources();
   }
+
+  public byte[] getNullPartitionName() throws ImpalaException, TException {
+    return execAndSerialize("getNullPartitionName", "Getting null partition name", () -> {
+      TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse();
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        response.setPartition_value(
+            MetaStoreUtil.getNullPartitionKeyValue(msClient.getHiveClient()));
+        response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+      }
+      return response;
+    });
+  }
+
+  public byte[] getLatestCompactions(byte[] thriftParams)
+      throws ImpalaException, TException {
+    TGetLatestCompactionsRequest request = new TGetLatestCompactionsRequest();
+    JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
+    return execAndSerialize("getLatestCompactions", "Getting latest compactions", () -> {
+      TGetLatestCompactionsResponse response = new TGetLatestCompactionsResponse();
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        response.setPartition_to_compaction_id(MetastoreShim.getLatestCompactions(
+            msClient, request.db_name, request.table_name, request.partition_names,
+            request.non_parition_name, request.last_compaction_id));
+        response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
+      }
+      return response;
+    });
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index d0cc54deb..57daa7765 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -611,6 +611,21 @@ public class CatalogdMetaProviderTest {
     }
   }
 
+  @Test
+  public void testLoadNullPartitionKeyValue() throws Exception {
+    provider_.cache_.invalidateAll();
+    CacheStats stats = diffStats();
+    String nullPartitionName = provider_.loadNullPartitionKeyValue();
+    assertNotNull(nullPartitionName);
+    stats = diffStats();
+    assertEquals(1, stats.missCount());
+    assertEquals(0, stats.hitCount());
+    assertEquals(nullPartitionName, provider_.loadNullPartitionKeyValue());
+    stats = diffStats();
+    assertEquals(0, stats.missCount());
+    assertEquals(1, stats.hitCount());
+  }
+
   private void testFileMetadataAfterCompaction(String dbName, String tableName,
       String partition, boolean isMajorCompaction) throws Exception {
     String tableOrPartition = dbName + "." + tableName + " " + partition;


(impala) 02/03: IMPALA-11553: Add event specific metrics in the table metrics

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 324a1aa37e4abecc73e8ccd39cca75cfcc54791e
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Sep 7 18:09:19 2023 -0700

    IMPALA-11553: Add event specific metrics in the table metrics
    
    This patch adds an event specific metric "avg-events-process-duration"
    at the table level metrics. This metric is also extended to last 1min,
    5mins, 15mins duration. This metric is useful to identify the average
    events processed duration on the table. This is helpful to identify if
    a particular table is causing event procssor lagging and as a temporary
    workaround, event processing can be disabled on that table.
    
    Another metric is also added in the event processor summary page,
    "events-consuming-delay-ms", is the time difference in milliseconds of
    the event created in the metastore and event processed by event
    processor. This is another useful metric to gauge how the event
    processor is lagging.
    
    Tests:
      - Manually verified the metrics on catalogD UI page when running some
    hive workloads.
    
    Change-Id: I2428029361e610a0fcd8ed11be2ab771f03b00dd
    Reviewed-on: http://gerrit.cloudera.org:8080/20473
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../main/java/org/apache/impala/catalog/Table.java | 10 +++
 .../impala/catalog/events/MetastoreEvents.java     | 73 +++++++++++++++-------
 .../catalog/events/MetastoreEventsProcessor.java   |  9 +++
 tests/custom_cluster/test_events_custom_configs.py |  4 +-
 tests/webserver/test_web_pages.py                  |  4 ++
 5 files changed, 76 insertions(+), 24 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index eb8a863bd..97e3f4042 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -61,6 +61,7 @@ import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 
+import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -196,6 +197,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   public static final AtomicInteger LOADING_TABLES = new AtomicInteger(0);
 
+  // Table property key to determine the table's events process duration
+  public static final String TBL_EVENTS_PROCESS_DURATION = "events-process-duration";
+
+  // The last sync event id of the table
+  public static final String LAST_SYNC_EVENT_ID = "last-sync-event-id";
+
   // this field represents the last event id in metastore upto which this table is
   // synced. It is used if the flag sync_to_latest_event_on_ddls is set to true.
   // Making it as volatile so that read and write of this variable are thread safe.
@@ -382,6 +389,9 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     metrics_.addTimer(HMS_LOAD_TBL_SCHEMA);
     metrics_.addTimer(LOAD_DURATION_ALL_COLUMN_STATS);
     metrics_.addCounter(NUMBER_OF_INFLIGHT_EVENTS);
+    metrics_.addTimer(TBL_EVENTS_PROCESS_DURATION);
+    metrics_.addGauge(LAST_SYNC_EVENT_ID,
+        (Gauge<Long>) () -> Long.valueOf(lastSyncedEventId_));
   }
 
   public Metrics getMetrics() { return metrics_; }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 0e40c37bf..f6ffdb1b5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.events;
 
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -70,6 +71,8 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
 
+import static org.apache.impala.catalog.Table.TBL_EVENTS_PROCESS_DURATION;
+
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
@@ -1092,6 +1095,25 @@ public class MetastoreEvents {
       }
       return false;
     }
+
+    @Override
+    protected void process() throws MetastoreNotificationException, CatalogException {
+      Timer.Context context = null;
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_);
+      if (tbl != null) {
+        context = tbl.getMetrics().getTimer(TBL_EVENTS_PROCESS_DURATION).time();
+      }
+      try {
+        processTableEvent();
+      } finally {
+        if (context != null) {
+          context.stop();
+        }
+      }
+    }
+
+    protected abstract void processTableEvent() throws MetastoreNotificationException,
+        CatalogException;
   }
 
   /**
@@ -1199,7 +1221,7 @@ public class MetastoreEvents {
      * overridden. Else, it will ignore the event
      */
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       // check if the table exists already. This could happen in corner cases of the
       // table being dropped and recreated with the same name or in case this event is
       // a self-event (see description of self-event in the class documentation of
@@ -1348,7 +1370,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       if (isSelfEvent()) {
         infoLog("Not processing the insert event as it is a self-event");
         return;
@@ -1515,7 +1537,8 @@ public class MetastoreEvents {
      * table on the tblName from the event
      */
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isRename_) {
         processRename();
         return;
@@ -1778,9 +1801,10 @@ public class MetastoreEvents {
      * not a huge problem since the tables will eventually be created when the
      * create events are processed but there will be a non-zero amount of time when the
      * table will not be existing in catalog.
+     * TODO: IMPALA-12646, to track average process time for drop operations.
      */
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       Reference<Boolean> tblRemovedLater = new Reference<>();
       boolean removedTable;
       removedTable = catalogOpExecutor_
@@ -2105,9 +2129,11 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       // bail out early if there are not partitions to process
       if (addedPartitions_.isEmpty()) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Partition list is empty. Ignoring this event.");
         return;
       }
@@ -2241,7 +2267,8 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -2260,11 +2287,11 @@ public class MetastoreEvents {
             + "parameters which can be ignored.");
         return;
       }
-
       // Reload the whole table if it's a transactional table or materialized view.
-      // Materialized views are treated as a special case because it's possible to receive
-      // partition event on MVs, but they are regular views in Impala. That cause problems
-      // on the reloading partition logic which expects it to be a HdfsTable.
+      // Materialized views are treated as a special case because it's possible to
+      // receive partition event on MVs, but they are regular views in Impala. That
+      // cause problems on the reloading partition logic which expects it to be a
+      // HdfsTable.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
           || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
         reloadTransactionalTable();
@@ -2279,11 +2306,11 @@ public class MetastoreEvents {
           reloadPartitions(Arrays.asList(partitionAfter_),
               FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, "ALTER_PARTITION event");
         } catch (CatalogException e) {
-          throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
-                  + "partition on table {} partition {} failed. Event processing cannot "
-                  + "continue. Issue an invalidate command to reset the event processor "
-                  + "state.", getFullyQualifiedTblName(),
-              HdfsTable.constructPartitionName(tPartSpec)), e);
+          throw new MetastoreNotificationNeedsInvalidateException(
+              debugString("Refresh partition on table {} partition {} failed. Event " +
+                  "processing cannot continue. Issue an invalidate command to reset " +
+                  "the event processor state.", getFullyQualifiedTblName(),
+                  HdfsTable.constructPartitionName(tPartSpec)), e);
         }
       }
     }
@@ -2381,12 +2408,12 @@ public class MetastoreEvents {
     List<T> getBatchEvents() { return batchedEvents_; }
 
     @Override
-    protected void process() throws MetastoreNotificationException, CatalogException {
+    protected void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
-
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
@@ -2503,12 +2530,15 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       // we have seen cases where a add_partition event is generated with empty
       // partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
       // list is not empty
       if (droppedPartitions_.isEmpty()) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Partition list is empty. Ignoring this event.");
+        return;
       }
       try {
         // Reload the whole table if it's a transactional table or materialized view.
@@ -2587,7 +2617,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    protected void process() throws MetastoreNotificationException {
+    protected void processTableEvent() throws MetastoreNotificationException {
       if (msTbl_ == null) {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
@@ -2686,7 +2716,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       if (isSelfEvent() || isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
@@ -2695,7 +2725,6 @@ public class MetastoreEvents {
                 .getCount());
         return;
       }
-
       if (isRefresh_) {
         if (reloadPartition_ != null) {
           processPartitionReload();
@@ -2875,7 +2904,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    protected void process() throws MetastoreNotificationException {
+    protected void processTableEvent() throws MetastoreNotificationException {
       try {
         if (partitionName_ == null) {
           reloadTableFromCatalog("Commit Compaction event", true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 3a5a6efd7..aa49cfc47 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -268,6 +268,11 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   // number of batch events generated
   public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
+  // metric to measure the delay in msec, between the event created in metastore and time
+  // it took to be consumed by the event processor
+  public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming" +
+      "-delay";
+
   private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
 
   // List of event types to skip while fetching notification events from metastore
@@ -647,6 +652,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     metrics_
         .addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size);
     metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
+    metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS);
   }
 
   /**
@@ -1162,6 +1168,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
           lastSyncedEventTimeSecs_.set(event.getEventTime());
+          metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update(
+              (System.currentTimeMillis() / 1000) - event.getEventTime(),
+                  TimeUnit.SECONDS);
         }
       }
     } catch (CatalogException e) {
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 04efc7255..31ee6880a 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -992,8 +992,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
         "Failing query(impala={}): {}".format(use_impala_client, stmt)
     else:
       # hive was used to run the stmts, any events generated should not have been deemed
-      # as self events
-      assert events_skipped == events_skipped_after
+      # as self events unless there are empty partition add/drop events
+      assert events_skipped <= events_skipped_after
 
   def __get_tbl_location(self, db_name, tbl_name):
     assert self.hive_client is not None
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 553ce0218..b46490bfd 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -54,6 +54,7 @@ class TestWebPage(ImpalaTestSuite):
   PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus"
   QUERIES_URL = "http://localhost:{0}/queries"
   HEALTHZ_URL = "http://localhost:{0}/healthz"
+  EVENT_PROCESSOR_URL = "http://localhost:{0}/events"
 
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -339,8 +340,11 @@ class TestWebPage(ImpalaTestSuite):
     self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes")
     self.__test_table_metrics(unique_database, "foo_part", "num-files")
     self.__test_table_metrics(unique_database, "foo_part", "alter-duration")
+    self.__test_table_metrics(unique_database, "foo_part", "events-process-duration")
     self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1")
     self.__test_catalog_tables_loading_time(unique_database, "foo_part")
+    self.get_and_check_status(self.EVENT_PROCESSOR_URL, "events-consuming-delay",
+        ports_to_test=self.CATALOG_TEST_PORT)
 
   def __test_catalog_object(self, db_name, tbl_name, cluster_properties):
     """Tests the /catalog_object endpoint for the given db/table. Runs


(impala) 03/03: IMPALA-12668: Enable clang-tidy checks for implicit fallthrough

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dd8ddf77c39ac34457497e160aefe707b7324331
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Fri Dec 15 16:34:47 2023 -0800

    IMPALA-12668: Enable clang-tidy checks for implicit fallthrough
    
    In switch/case statements, one case can fallthrough to the
    next case. Sometimes this is intentional, but it is also a
    common source of bugs (i.e. a missing break/return statement).
    Clang-Tidy's clang-diagnostic-implicit-fallthrough flags
    locations where a case falls through to the next case without
    an explicit fallthrough declaration.
    
    This change enables clang-diagnostic-implicit-fallthrough and
    fixes failing locations. Since Impala uses C++17, this uses
    C++17's [[fallthrough]] to indicate an explicit fallthrough.
    This also adjusts clang-tidy's output to suggest [[fallthrough]]
    as the preferred way to indicate fallthrough.
    
    Testing:
     - Ran core job
     - Ran clang tidy
    
    Change-Id: I6d65c92b442fa0317c3af228997571e124a54092
    Reviewed-on: http://gerrit.cloudera.org:8080/20847
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zihao Ye <ey...@163.com>
    Reviewed-by: Michael Smith <mi...@cloudera.com>
---
 .clang-tidy                                      |  1 -
 be/src/benchmarks/atoi-benchmark.cc              | 19 ++++++--
 be/src/codegen/codegen-anyval-read-write-info.cc |  1 +
 be/src/codegen/codegen-anyval.cc                 |  1 +
 be/src/exec/orc/hdfs-orc-scanner.cc              |  1 +
 be/src/exec/parquet/parquet-column-stats.cc      |  1 +
 be/src/exprs/agg-fn-evaluator.cc                 |  4 +-
 be/src/exprs/anyval-util.h                       |  3 +-
 be/src/runtime/datetime-parser-common.cc         |  1 +
 be/src/runtime/descriptors.cc                    |  2 +
 be/src/runtime/io/disk-file.cc                   |  2 +
 be/src/runtime/raw-value.cc                      |  1 +
 be/src/runtime/raw-value.inline.h                |  3 +-
 be/src/runtime/runtime-filter-ir.cc              |  2 +
 be/src/runtime/types.cc                          |  1 +
 be/src/service/query-result-set.cc               |  1 +
 be/src/util/bit-packing.inline.h                 | 19 +++++---
 be/src/util/hash-util.h                          | 56 +++++++++++++++++-------
 be/src/util/jwt-util.cc                          |  3 +-
 be/src/util/redactor.cc                          |  1 +
 be/src/util/string-parser.h                      |  7 ++-
 bin/run_clang_tidy.sh                            |  7 ++-
 22 files changed, 105 insertions(+), 32 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index f54f14933..c4499f953 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -48,7 +48,6 @@ Checks: "-*,clang*,\
 -clang-diagnostic-gnu-anonymous-struct,\
 -clang-diagnostic-gnu-zero-variadic-macro-arguments,\
 -clang-diagnostic-header-hygiene,\
--clang-diagnostic-implicit-fallthrough,\
 -clang-diagnostic-missing-prototypes,\
 -clang-diagnostic-missing-variable-declarations,\
 -clang-diagnostic-nested-anon-types,\
diff --git a/be/src/benchmarks/atoi-benchmark.cc b/be/src/benchmarks/atoi-benchmark.cc
index b47f257d8..0fcdd068e 100644
--- a/be/src/benchmarks/atoi-benchmark.cc
+++ b/be/src/benchmarks/atoi-benchmark.cc
@@ -91,7 +91,9 @@ inline int32_t AtoiUnsafe(const char* s, int len) {
   bool negative = false;
   int i = 0;
   switch (*s) {
-    case '-': negative = true;
+    case '-':
+      negative = true;
+      [[fallthrough]];
     case '+': ++i;
   }
 
@@ -107,25 +109,34 @@ inline int32_t AtoiUnrolled(const char* s, int len) {
     int32_t val = 0;
     bool negative = false;
     switch (*s) {
-      case '-': negative = true;
+      case '-':
+        negative = true;
+        [[fallthrough]];
       case '+': --len; ++s;
     }
 
     switch(len) {
       case 8:
         val += (DIGIT(s[len - 8])) * 10000;
+        [[fallthrough]];
       case 7:
         val += (DIGIT(s[len - 7])) * 10000;
+        [[fallthrough]];
       case 6:
         val += (DIGIT(s[len - 6])) * 10000;
+        [[fallthrough]];
       case 5:
         val += (DIGIT(s[len - 5])) * 10000;
+        [[fallthrough]];
       case 4:
         val += (DIGIT(s[len - 4])) * 1000;
+        [[fallthrough]];
       case 3:
         val += (DIGIT(s[len - 3])) * 100;
+        [[fallthrough]];
       case 2:
         val += (DIGIT(s[len - 2])) * 10;
+        [[fallthrough]];
       case 1:
         val += (DIGIT(s[len - 1]));
     }
@@ -140,7 +151,9 @@ inline int32_t AtoiCased(const char* s, int len) {
     int32_t val = 0;
     bool negative = false;
     switch (*s) {
-      case '-': negative = true;
+      case '-':
+        negative = true;
+        [[fallthrough]];
       case '+': --len; ++s;
     }
 
diff --git a/be/src/codegen/codegen-anyval-read-write-info.cc b/be/src/codegen/codegen-anyval-read-write-info.cc
index 263e3f049..d68c300ef 100644
--- a/be/src/codegen/codegen-anyval-read-write-info.cc
+++ b/be/src/codegen/codegen-anyval-read-write-info.cc
@@ -134,6 +134,7 @@ void CodegenAnyValReadWriteInfo::CodegenConvertToCanonicalForm() {
       llvm::Value* new_val = CodegenAnyVal::ConvertToCanonicalForm(codegen_, builder_,
           type_, GetSimpleVal());
       SetSimpleVal(new_val);
+      break;
     }
     default:
       ;
diff --git a/be/src/codegen/codegen-anyval.cc b/be/src/codegen/codegen-anyval.cc
index 773d3ccf0..b98f9e31c 100644
--- a/be/src/codegen/codegen-anyval.cc
+++ b/be/src/codegen/codegen-anyval.cc
@@ -598,6 +598,7 @@ void CodegenAnyVal::ConvertToCanonicalForm() {
       llvm::Value* new_val = ConvertToCanonicalForm(codegen_,
           builder_, type_, GetVal());
       SetVal(new_val);
+      break;
     }
     default:
       ;
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index 6858be523..6c8cf7822 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -689,6 +689,7 @@ void HdfsOrcScanner::SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor*
       break;
     case ACID_FIELD_ROWID_INDEX:
       file_position_ = slot_desc;
+      break;
     default:
       break;
   }
diff --git a/be/src/exec/parquet/parquet-column-stats.cc b/be/src/exec/parquet/parquet-column-stats.cc
index 028601810..63e7ab116 100644
--- a/be/src/exec/parquet/parquet-column-stats.cc
+++ b/be/src/exec/parquet/parquet-column-stats.cc
@@ -174,6 +174,7 @@ bool ColumnStatsReader::ReadFromString(StatsField stats_field,
               static_cast<Decimal16Value*>(slot));
         }
       DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize();
+      break;
     case TYPE_DATE:
       return ColumnStats<DateValue>::DecodePlainValue(encoded_value, slot, element_.type);
     default:
diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc
index 7eafa1ae1..bf74ddab2 100644
--- a/be/src/exprs/agg-fn-evaluator.cc
+++ b/be/src/exprs/agg-fn-evaluator.cc
@@ -260,7 +260,9 @@ void AggFnEvaluator::SetDstSlot(const AnyVal* src, const SlotDescriptor& dst_slo
 #endif
           return;
         default:
-          break;
+          DCHECK(false) << "Unknown decimal byte size: "
+                        << dst_slot_desc.type().GetByteSize();
+          return;
       }
     case TYPE_DATE:
       *reinterpret_cast<DateValue*>(slot) =
diff --git a/be/src/exprs/anyval-util.h b/be/src/exprs/anyval-util.h
index c930012c0..f14f83526 100644
--- a/be/src/exprs/anyval-util.h
+++ b/be/src/exprs/anyval-util.h
@@ -330,7 +330,8 @@ class AnyValUtil {
 #endif
             return;
           default:
-            break;
+            DCHECK(false) << "Unknown decimal byte size: " << type.GetByteSize();
+            return;
         }
       case TYPE_DATE:
         *reinterpret_cast<DateVal*>(dst) =
diff --git a/be/src/runtime/datetime-parser-common.cc b/be/src/runtime/datetime-parser-common.cc
index efb026dac..1c81b6516 100644
--- a/be/src/runtime/datetime-parser-common.cc
+++ b/be/src/runtime/datetime-parser-common.cc
@@ -130,6 +130,7 @@ void ReportBadFormat(FunctionContext* context, FormatTokenizationResult error_ty
         break;
       case MISPLACED_FX_MODIFIER_ERROR:
         ss << "PARSE ERROR: FX modifier should be at the beginning of the format string.";
+        break;
       case CONFLICTING_DAY_OF_WEEK_TOKENS_ERROR:
         ss << "PARSE ERROR: Multiple day of week tokens provided.";
         break;
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 28e10c842..c3567fdba 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -1011,6 +1011,7 @@ CodegenAnyValReadWriteInfo CodegenAnyValToReadWriteInfo(CodegenAnyVal& any_val,
     case TYPE_STRUCT:
       DCHECK(false) << "Invalid type for this function. "
                     << "Call 'StoreStructToNativePtr()' instead.";
+      break;
     default:
       DCHECK(false) << "NYI: " << rwi.type().DebugString();
       break;
@@ -1074,6 +1075,7 @@ void SlotDescriptor::CodegenStoreNonNullAnyVal(
     case TYPE_STRUCT:
       DCHECK(false) << "Invalid type for this function. "
                     << "Call 'StoreStructToNativePtr()' instead.";
+      break;
     default:
       DCHECK(false) << "NYI: " << type.DebugString();
       break;
diff --git a/be/src/runtime/io/disk-file.cc b/be/src/runtime/io/disk-file.cc
index 767047567..85a378653 100644
--- a/be/src/runtime/io/disk-file.cc
+++ b/be/src/runtime/io/disk-file.cc
@@ -110,8 +110,10 @@ void MemBlock::Delete(bool* reserved, bool* allocated) {
       free(data_);
       data_ = nullptr;
       *allocated = true;
+      [[fallthrough]];
     case MemBlockStatus::RESERVED:
       *reserved = true;
+      [[fallthrough]];
     default:
       SetStatusLocked(lock, MemBlockStatus::DISABLED);
       DCHECK(data_ == nullptr);
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index a6d9fbe2f..3e1721695 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -206,6 +206,7 @@ void RawValue::WriteNonNullPrimitive(const void* value, void* dst, const ColumnT
     case TYPE_STRUCT: {
       // Structs should be handled by a different Write() function within this class.
       DCHECK(false);
+      break;
     }
     default:
       DCHECK(false) << "RawValue::WriteNonNullPrimitive(): bad type: "
diff --git a/be/src/runtime/raw-value.inline.h b/be/src/runtime/raw-value.inline.h
index 4f135856b..390a0ccfa 100644
--- a/be/src/runtime/raw-value.inline.h
+++ b/be/src/runtime/raw-value.inline.h
@@ -147,7 +147,8 @@ inline bool RawValue::Eq(const void* v1, const void* v2, const ColumnType& type)
           return reinterpret_cast<const Decimal16Value*>(v1)->value()
               == reinterpret_cast<const Decimal16Value*>(v2)->value();
         default:
-          break;
+          DCHECK(false) << "Unknown decimal byte size: " << type.GetByteSize();
+          return 0;
       }
     default:
       DCHECK(false) << type;
diff --git a/be/src/runtime/runtime-filter-ir.cc b/be/src/runtime/runtime-filter-ir.cc
index 5c4f8113a..b515a3fd9 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -37,12 +37,14 @@ bool IR_ALWAYS_INLINE RuntimeFilter::Eval(
           return filter->EvalOverlap(col_type, val, val);
         }
       }
+      break;
     }
     case TRuntimeFilterType::IN_LIST: {
       InListFilter* filter = get_in_list_filter();
       if (LIKELY(filter && !filter->AlwaysTrue())) {
         return filter->Find(val, col_type);
       }
+      break;
     }
   }
   return true;
diff --git a/be/src/runtime/types.cc b/be/src/runtime/types.cc
index cf7ba89b4..dd1beeeb1 100644
--- a/be/src/runtime/types.cc
+++ b/be/src/runtime/types.cc
@@ -146,6 +146,7 @@ TPrimitiveType::type ToThrift(PrimitiveType ptype) {
     case TYPE_ARRAY:
     case TYPE_MAP:
       DCHECK(false) << "NYI: " << ptype;
+      [[fallthrough]];
     default: return TPrimitiveType::INVALID_TYPE;
   }
 }
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
index f32f05fa1..97d97d97e 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -472,6 +472,7 @@ int HS2ColumnarResultSet::AddRows(
         to->binaryVal.values.insert(to->binaryVal.values.end(),
             from->binaryVal.values.begin() + start_idx,
             from->binaryVal.values.begin() + start_idx + rows_added);
+        break;
       default:
         DCHECK(false) << "Unsupported type: "
                       << TypeToString(ThriftToType(
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 7863db0fb..6e859afae 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -411,8 +411,10 @@ const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
 
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
-  case 31 - i: out[30 - i] = \
-      static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer));
+  case 31 - i: \
+    out[30 - i] = \
+        static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer)); \
+    [[fallthrough]];
 
   // Use switch with fall-through cases to minimise branching.
   switch (num_values) {
@@ -451,11 +453,14 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict
 
 #pragma push_macro("DECODE_VALUES_CASE")
 #define DECODE_VALUES_CASE(ignore1, i, ignore2) \
-  case 31 - i: { \
-    uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer); \
-    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * stride; \
-    DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \
-  }
+  case 31 - i: \
+    { \
+      uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer); \
+      uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * stride; \
+      DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), \
+          decode_error); \
+    } \
+    [[fallthrough]];
 
   // Use switch with fall-through cases to minimise branching.
   switch (num_values) {
diff --git a/be/src/util/hash-util.h b/be/src/util/hash-util.h
index bb60997ae..d98dd47b3 100644
--- a/be/src/util/hash-util.h
+++ b/be/src/util/hash-util.h
@@ -140,14 +140,27 @@ class HashUtil {
 
     const uint8_t* data2 = reinterpret_cast<const uint8_t*>(data);
     switch (len & 7) {
-      case 7: h ^= uint64_t(data2[6]) << 48;
-      case 6: h ^= uint64_t(data2[5]) << 40;
-      case 5: h ^= uint64_t(data2[4]) << 32;
-      case 4: h ^= uint64_t(data2[3]) << 24;
-      case 3: h ^= uint64_t(data2[2]) << 16;
-      case 2: h ^= uint64_t(data2[1]) << 8;
-      case 1: h ^= uint64_t(data2[0]);
-              h *= MURMUR_PRIME;
+      case 7:
+        h ^= uint64_t(data2[6]) << 48;
+        [[fallthrough]];
+      case 6:
+        h ^= uint64_t(data2[5]) << 40;
+        [[fallthrough]];
+      case 5:
+        h ^= uint64_t(data2[4]) << 32;
+        [[fallthrough]];
+      case 4:
+        h ^= uint64_t(data2[3]) << 24;
+        [[fallthrough]];
+      case 3:
+        h ^= uint64_t(data2[2]) << 16;
+        [[fallthrough]];
+      case 2:
+        h ^= uint64_t(data2[1]) << 8;
+        [[fallthrough]];
+      case 1:
+        h ^= uint64_t(data2[0]);
+        h *= MURMUR_PRIME;
     }
 
     h ^= h >> MURMUR_R;
@@ -260,13 +273,26 @@ class HashUtil {
     v = 0;
 
     switch (len & 7) {
-      case 7: v ^= static_cast<uint64_t>(pos2[6]) << 48;
-      case 6: v ^= static_cast<uint64_t>(pos2[5]) << 40;
-      case 5: v ^= static_cast<uint64_t>(pos2[4]) << 32;
-      case 4: v ^= static_cast<uint64_t>(pos2[3]) << 24;
-      case 3: v ^= static_cast<uint64_t>(pos2[2]) << 16;
-      case 2: v ^= static_cast<uint64_t>(pos2[1]) << 8;
-      case 1: v ^= static_cast<uint64_t>(pos2[0]);
+      case 7:
+        v ^= static_cast<uint64_t>(pos2[6]) << 48;
+        [[fallthrough]];
+      case 6:
+        v ^= static_cast<uint64_t>(pos2[5]) << 40;
+        [[fallthrough]];
+      case 5:
+        v ^= static_cast<uint64_t>(pos2[4]) << 32;
+        [[fallthrough]];
+      case 4:
+        v ^= static_cast<uint64_t>(pos2[3]) << 24;
+        [[fallthrough]];
+      case 3:
+        v ^= static_cast<uint64_t>(pos2[2]) << 16;
+        [[fallthrough]];
+      case 2:
+        v ^= static_cast<uint64_t>(pos2[1]) << 8;
+        [[fallthrough]];
+      case 1:
+        v ^= static_cast<uint64_t>(pos2[0]);
         h ^= FastHashMix(v);
         h *= m;
     }
diff --git a/be/src/util/jwt-util.cc b/be/src/util/jwt-util.cc
index 63e031c4b..97d0df312 100644
--- a/be/src/util/jwt-util.cc
+++ b/be/src/util/jwt-util.cc
@@ -98,6 +98,7 @@ class JWKSetParser {
       case rapidjson::kNumberType:
         if (value.IsInt()) return "Integer";
         if (value.IsDouble()) return "Float";
+        [[fallthrough]];
       default:
         DCHECK(false);
         return "Unknown";
@@ -908,4 +909,4 @@ Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
   return status;
 }
 
-} // namespace impala
\ No newline at end of file
+} // namespace impala
diff --git a/be/src/util/redactor.cc b/be/src/util/redactor.cc
index fbdda6657..cbdff4400 100644
--- a/be/src/util/redactor.cc
+++ b/be/src/util/redactor.cc
@@ -63,6 +63,7 @@ string NameOfTypeOfJsonValue(const Value& value) {
     case rapidjson::kNumberType:
       if (value.IsInt()) return "Integer";
       if (value.IsDouble()) return "Float";
+      [[fallthrough]];
     default:
       DCHECK(false);
       return "Unknown";
diff --git a/be/src/util/string-parser.h b/be/src/util/string-parser.h
index c20432030..149c5d5eb 100644
--- a/be/src/util/string-parser.h
+++ b/be/src/util/string-parser.h
@@ -154,6 +154,7 @@ class StringParser {
       switch (*s) {
         case '-':
           is_negative = true;
+          [[fallthrough]];
         case '+':
           ++s;
           --len;
@@ -310,6 +311,7 @@ class StringParser {
       case '-':
         negative = true;
         max_val = static_cast<UnsignedT>(std::numeric_limits<T>::max()) + 1;
+        [[fallthrough]];
       case '+': ++i;
     }
 
@@ -366,6 +368,7 @@ class StringParser {
       case '-':
         negative = true;
         max_val = static_cast<UnsignedT>(std::numeric_limits<T>::max()) + 1;
+        [[fallthrough]];
       case '+': i = 1;
     }
 
@@ -472,7 +475,9 @@ class StringParser {
     bool negative = false;
     int i = 0;
     switch (*s) {
-      case '-': negative = true;  // Fallthrough is intended.
+      case '-':
+        negative = true;
+        [[fallthrough]];
       case '+': i = 1;
     }
 
diff --git a/bin/run_clang_tidy.sh b/bin/run_clang_tidy.sh
index 4319873f5..816f201dd 100755
--- a/bin/run_clang_tidy.sh
+++ b/bin/run_clang_tidy.sh
@@ -62,10 +62,15 @@ TMP_STDERR=$(mktemp)
 STRCAT_MESSAGE="Impala-specific note: This can also be fixed using the StrCat() function \
 from be/src/gutil/strings strcat.h)"
 CLANG_STRING_CONCAT="performance-inefficient-string-concatenation"
+FALLTHROUGH_MESSAGE="Impala-specific note: Impala is a C++ 17 codebase, so the preferred \
+way to indicate intended fallthrough is C++ 17's [[fallthrough]]"
+CLANG_FALLTHROUGH="clang-diagnostic-implicit-fallthrough"
 trap "rm $TMP_STDERR" EXIT
 if ! run-clang-tidy.py -quiet -header-filter "${PIPE_DIRS%?}" \
                        -j"${CORES}" ${DIRS} 2> ${TMP_STDERR} | \
-  sed "/${CLANG_STRING_CONCAT}/ s#\$# ${STRCAT_MESSAGE}#";
+   sed "/${CLANG_STRING_CONCAT}/ s#\$# \n${STRCAT_MESSAGE}#" | \
+   sed "/${CLANG_FALLTHROUGH}/ s#\$# \n${FALLTHROUGH_MESSAGE}#" | \
+   sed 's#FALLTHROUGH_INTENDED#[[fallthrough]]#';
 then
   echo "run-clang-tidy.py hit an error, dumping stderr output"
   cat ${TMP_STDERR} >&2