You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/30 05:24:39 UTC

[impala] branch master updated (0c8fc997e -> b718d6386)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 0c8fc997e IMPALA-12395: Override scan cardinality for optimized count star
     new 27d37a60c IMPALA-12024: Add catalog profile for createTable
     new b718d6386 IMPALA-11535: Skip older events in the event processor based on the latestRefreshEventID

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc                   |  10 +
 be/src/service/client-request-state.cc             |  17 +-
 be/src/service/impala-server.cc                    |  29 ++-
 be/src/service/impala-server.h                     |  10 +-
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 common/thrift/CatalogService.thrift                |   4 +
 .../java/org/apache/impala/catalog/Catalog.java    |  10 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  17 +-
 .../main/java/org/apache/impala/catalog/Table.java |   2 +-
 .../impala/catalog/events/MetastoreEvents.java     |  57 +++++-
 .../catalog/events/MetastoreEventsProcessor.java   |  11 ++
 .../org/apache/impala/service/BackendConfig.java   |   9 +
 .../apache/impala/service/CatalogOpExecutor.java   | 204 +++++++++++++++------
 .../java/org/apache/impala/service/Frontend.java   |   2 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  67 ++++++-
 .../events/MetastoreEventsProcessorTest.java       |  44 +++++
 tests/custom_cluster/test_events_custom_configs.py |  64 +++++++
 tests/query_test/test_observability.py             |  60 ++++++
 19 files changed, 533 insertions(+), 88 deletions(-)


[impala] 01/02: IMPALA-12024: Add catalog profile for createTable

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 27d37a60c954d7734cfaea35e73939d5c7681d55
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Aug 16 10:59:47 2023 +0800

    IMPALA-12024: Add catalog profile for createTable
    
    DDL/DMLs might spend a long time in the execution in catalogd.
    Currently, the profiles just have a counter of CatalogOpExecTimer. We
    need more items to show the details of how they are executed in
    catalogd.
    
    As the first step, this patch adds the profile timeline for how the
    createTable DDL is executed in Catalog Server. Also added more events in
    the existing "Query Timeline" for when the CatalogDdlRequest finished
    and when the catalog updates are applied.
    
    To implement this, a 'profile' field is added in TDdlExecResponse to
    carry the execution counters and timeline in catalogd. Currently, we
    just uses the timeline of it. We can add more counters in the future.
    Several methods add a timeline parameter to mark the progress in them.
    Timeline events are added after each RPC finished.
    
    Here is an example when HMS is hanging for 26s in a CTAS. I used gdb to
    attach to HMS as the JIRA description mentioned. In the timeline, we can
    see the time is spent in the first HMS RPC that fetching the current HMS
    event id:
        Catalog Server Operation: 26s560ms
           - Got metastoreDdlLock: 163.977us (163.977us)
           - Got Metastore client: 166.339us (2.362us)
           - Got current Metastore event id 8355270: 26s494ms (26s494ms)
           - Created table in Metastore: 26s558ms (63.507ms)
           - Fetched event batch from Metastore: 26s559ms (1.155ms)
           - Created table in catalog cache: 26s560ms (1.164ms)
           - DDL finished: 26s560ms (65.538us)
        Query Compilation: 164.257ms
           - Metadata of all 1 tables cached: 10.535ms (10.535ms)
           - Analysis finished: 118.324ms (107.788ms)
           - Authorization finished (noop): 118.489ms (164.626us)
           - Value transfer graph computed: 118.830ms (341.792us)
           - Single node plan created: 150.150ms (31.319ms)
           - Runtime filters computed: 150.254ms (103.529us)
           - Distributed plan created: 151.832ms (1.578ms)
           - Planning finished: 164.257ms (12.425ms)
        Query Timeline: 27s304ms
           - Query submitted: 129.658us (129.658us)
           - Planning finished: 170.224ms (170.095ms)
           - CatalogDdlRequest finished: 26s731ms (26s561ms)
           - Applied catalog updates from DDL: 26s740ms (8.752ms)
           - Submit for admission: 26s740ms (22.233us)
           - Completed admission: 26s740ms (286.295us)
           - Ready to start on 3 backends: 26s740ms (155.916us)
           - All 3 execution backends (3 fragment instances) started: 26s751ms (10.864ms)
           - Last row fetched: 26s920ms (168.226ms)
           - Released admission control resources: 26s920ms (27.635us)
           - DML data written: 26s920ms (126.369us)
           - Applied catalog updates from DDL: 26s985ms (65.354ms)
           - DML Metastore update finished: 26s985ms (30.343us)
           - Rows available: 26s985ms (27.050us)
           - Unregister query: 27s304ms (318.661ms)
    
    An example of creating a Kudu table:
        Catalog Server Operation: 1s730ms
           - Got Metastore client: 113.403us (113.403us)
           - Got current Metastore event id 8355276: 974.500us (861.097us)
           - Got Kudu client: 212.123ms (211.148ms)
           - Got kuduDdlLock: 212.128ms (4.680us)
           - Checked table existence in Kudu: 850.786ms (638.658ms)
           - Created table in Kudu: 1s623ms (772.379ms)
           - Got metastoreDdlLock: 1s623ms (397.305us)
           - Got Metastore client: 1s623ms (7.813us)
           - Checked table existence in Metastore: 1s648ms (25.154ms)
           - Created table in Metastore: 1s725ms (76.348ms)
           - Fetched event batch from Metastore: 1s728ms (3.004ms)
           - Created table in catalog cache: 1s730ms (2.024ms)
           - DDL finished: 1s730ms (84.448us)
    
    An example of creating an Iceberg table:
        Catalog Server Operation: 1s573ms
           - Got Metastore client: 141.799us (141.799us)
           - Checked table existence in Metastore: 2.957ms (2.815ms)
           - Got current Metastore event id 8355277: 3.669ms (712.475us)
           - Created table using Iceberg Catalog HIVE_CATALOG: 1s379ms (1s375ms)
           - Fetched event batch from Metastore: 1s381ms (2.188ms)
           - Created table in catalog cache: 1s382ms (1.556ms)
           - Set Iceberg table owner in Metastore: 1s573ms (190.262ms)
           - DDL finished: 1s573ms (59.176us)
    
    Tests:
     - Add e2e tests to verify the DDL timeline events exist in profile
    
    Change-Id: I3ebf591625e71391a5b23f56ddca8f0ae97b1efa
    Reviewed-on: http://gerrit.cloudera.org:8080/20368
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |  17 +-
 be/src/service/impala-server.cc                    |  29 +++-
 be/src/service/impala-server.h                     |  10 +-
 common/thrift/CatalogService.thrift                |   4 +
 .../java/org/apache/impala/catalog/Catalog.java    |  10 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 186 +++++++++++++++------
 .../java/org/apache/impala/service/Frontend.java   |   2 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  67 ++++++--
 tests/query_test/test_observability.py             |  60 +++++++
 9 files changed, 306 insertions(+), 79 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 0f9764201..0d48c921e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -705,11 +705,20 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   DebugActionNoFail(exec_request_->query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
 
   Status status = catalog_op_executor_->Exec(exec_request_->catalog_op_request);
+  query_events_->MarkEvent("CatalogDdlRequest finished");
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
   }
 
+  if (catalog_op_executor_->ddl_exec_response() != nullptr &&
+      catalog_op_executor_->ddl_exec_response()->__isset.profile) {
+    for (const TEventSequence& catalog_timeline :
+        catalog_op_executor_->ddl_exec_response()->profile.event_sequences) {
+      summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline);
+    }
+  }
+
   // If this is a CTAS request, there will usually be more work to do
   // after executing the CREATE TABLE statement (the INSERT portion of the operation).
   // The exception is if the user specified IF NOT EXISTS and the table already
@@ -725,7 +734,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   // Add newly created table to catalog cache.
   status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options());
+      exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -842,7 +851,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   status = parent_server_->ProcessCatalogUpdateResult(
       resp.result,
-      exec_request_->query_options.sync_ddl, query_options());
+      exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -1597,7 +1606,7 @@ Status ClientRequestState::UpdateCatalog() {
         query_events_->MarkEvent("Transaction committed");
       }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_->query_options.sync_ddl, query_options()));
+          exec_request_->query_options.sync_ddl, query_options(), query_events_));
     }
   } else if (InKuduTransaction()) {
     // Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1757,7 +1766,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
   RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options()));
+      exec_request_->query_options.sync_ddl, query_options(), query_events_));
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d054a5413..1a6863fd7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2192,7 +2192,7 @@ void ImpalaServer::CatalogUpdateCallback(
 }
 
 void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   // Wait for the update to be processed locally.
   VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
@@ -2203,14 +2203,17 @@ void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Received catalog version $0",
+        catalog_update_version));
     VLOG_QUERY << "Received catalog version: " << catalog_update_version;
   }
 }
 
 void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   int64_t min_req_subscriber_topic_version =
       catalog_update_info_.catalog_topic_version;
@@ -2223,15 +2226,18 @@ void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Received min subscriber topic version $0",
+        min_req_subscriber_topic_version));
     VLOG_QUERY << "Received min subscriber topic version: "
         << min_req_subscriber_topic_version;
   }
 }
 
 void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_version,
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   int64_t catalog_object_version_lower_bound =
       catalog_update_info_.catalog_object_version_lower_bound;
@@ -2247,8 +2253,11 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Local min catalog version reached $0",
+        min_req_catalog_object_version));
     VLOG_QUERY << "Updated catalog object version lower bound: "
         << min_req_catalog_object_version;
   }
@@ -2256,7 +2265,7 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
 
 Status ImpalaServer::ProcessCatalogUpdateResult(
     const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
-    const TQueryOptions& query_options) {
+    const TQueryOptions& query_options, RuntimeProfile::EventSequence* timeline) {
   const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
   if (!catalog_update_result.__isset.updated_catalog_objects &&
       !catalog_update_result.__isset.removed_catalog_objects) {
@@ -2264,15 +2273,16 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     // 'catalog_update_result' to determine when the effects of this operation
     // have been applied to the local catalog cache.
     if (catalog_update_result.is_invalidate) {
-      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id);
+      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id,
+          timeline);
     } else {
-      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
     }
     if (wait_for_all_subscribers) {
       // Now wait for this update to be propagated to all catalog topic subscribers.
       // If we make it here it implies the first condition was met (the update was
       // processed locally or the catalog service id has changed).
-      WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+      WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
     }
   } else {
     TUniqueId cur_service_id;
@@ -2304,6 +2314,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       // Apply the changes to the local catalog cache.
       TUpdateCatalogCacheResponse resp;
       Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
+      timeline->MarkEvent("Applied catalog updates from DDL");
       if (!status.ok()) LOG(ERROR) << status.GetDetail();
       RETURN_IF_ERROR(status);
     } else {
@@ -2336,10 +2347,10 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the effects
     // (catalog objects) of this operation.
-    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
     // Now wait for this update to be propagated to all catalog topic
     // subscribers.
-    WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+    WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
   }
   return Status::OK();
 }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index cc3ff88c6..61f593a7c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -423,24 +423,26 @@ class ImpalaServer : public ImpalaServiceIf,
   ///
   /// 'query_options' is used for running debug actions.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
-      bool wait_for_all_subscribers, const TQueryOptions& query_options)
+      bool wait_for_all_subscribers, const TQueryOptions& query_options,
+      RuntimeProfile::EventSequence* timeline)
       WARN_UNUSED_RESULT;
 
   /// Wait until the catalog update with version 'catalog_update_version' is
   /// received and applied in the local catalog cache or until the catalog
   /// service id has changed.
   void WaitForCatalogUpdate(const int64_t catalog_update_version,
-      const TUniqueId& catalog_service_id);
+      const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline);
 
   /// Wait until the minimum catalog object version in the local cache is
   /// greater than 'min_catalog_update_version' or until the catalog
   /// service id has changed.
   void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
-      const TUniqueId& catalog_service_id);
+      const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline);
 
   /// Wait until the last applied catalog update has been broadcast to
   /// all coordinators or until the catalog service id has changed.
-  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);
+  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id,
+      RuntimeProfile::EventSequence* timeline);
 
   /// Returns true if lineage logging is enabled, false otherwise.
   ///
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index bb3482cec..f309591f2 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -23,6 +23,7 @@ include "JniCatalog.thrift"
 include "Types.thrift"
 include "Status.thrift"
 include "Results.thrift"
+include "RuntimeProfile.thrift"
 include "hive_metastore.thrift"
 include "SqlConstraints.thrift"
 
@@ -209,6 +210,9 @@ struct TDdlExecResponse {
   // created table. This is useful for establishing lineage between table and it's
   // location for external tables.
   6: optional string table_location
+
+  // Profile of the DDL execution in catalogd
+  7: optional RuntimeProfile.TRuntimeProfileNode profile
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 62c3c92ae..78aae22d1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -49,6 +49,7 @@ import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.PatternMatcher;
 
 import com.google.common.base.Joiner;
@@ -396,6 +397,15 @@ public abstract class Catalog implements AutoCloseable {
    */
   public MetaStoreClient getMetaStoreClient() { return metaStoreClientPool_.getClient(); }
 
+  /**
+   * Same as the above but also update the given 'timeline'.
+   */
+  public MetaStoreClient getMetaStoreClient(EventSequence timeline) {
+    MetaStoreClient client = getMetaStoreClient();
+    if (timeline != null) timeline.markEvent("Got Metastore client");
+    return client;
+  }
+
   /**
    * Return all members of 'candidates' that match 'matcher'.
    * The results are sorted in String.CASE_INSENSITIVE_ORDER.
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f569dfbd8..13d5e4161 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -228,6 +228,7 @@ import org.apache.impala.thrift.TResetMetadataResponse;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
@@ -245,6 +246,7 @@ import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.DebugUtils;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
@@ -366,6 +368,27 @@ public class CatalogOpExecutor {
   // Table capabilities property name
   public static final String CAPABILITIES_KEY = "OBJCAPABILITIES";
 
+  // Labels used in catalog timelines
+  private static final String CHECKED_HMS_TABLE_EXISTENCE =
+      "Checked table existence in Metastore";
+  private static final String CREATED_HMS_TABLE = "Created table in Metastore";
+  private static final String CREATED_CATALOG_TABLE = "Created table in catalog cache";
+  private static final String CREATED_ICEBERG_TABLE =
+      "Created table using Iceberg Catalog ";
+  private static final String DDL_FINISHED = "DDL finished";
+  private static final String FETCHED_LATEST_HMS_EVENT_ID =
+      "Got current Metastore event id ";
+  private static final String FETCHED_HMS_EVENT_BATCH =
+      "Fetched event batch from Metastore";
+  private static final String FETCHED_HMS_TABLE = "Fetched table from Metastore";
+  private static final String GOT_METASTORE_DDL_LOCK = "Got metastoreDdlLock";
+  private static final String GOT_TABLE_WRITE_LOCK = "Got table write lock";
+  private static final String LOADED_ICEBERG_TABLE = "Loaded iceberg table";
+  private static final String SENT_CATALOG_FOR_SYNC_DDL =
+      "Sent catalog update for sync_ddl";
+  private static final String SET_ICEBERG_TABLE_OWNER =
+      "Set Iceberg table owner in Metastore";
+
   private final CatalogServiceCatalog catalog_;
   private final AuthorizationConfig authzConfig_;
   private final AuthorizationManager authzManager_;
@@ -395,6 +418,7 @@ public class CatalogOpExecutor {
 
   public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
       throws ImpalaException {
+    EventSequence catalogTimeline = new EventSequence("Catalog Server Operation");
     TDdlExecResponse response = new TDdlExecResponse();
     response.setResult(new TCatalogUpdateResult());
     response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
@@ -443,28 +467,29 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(create_table_as_select_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
           response.setNew_table_created(createTable(create_table_as_select_params,
-              response, syncDdl, wantMinimalResult));
+              response, catalogTimeline, syncDdl, wantMinimalResult));
           break;
         case CREATE_TABLE:
           TCreateTableParams create_table_params = ddlRequest.getCreate_table_params();
           tTableName = Optional.of((create_table_params.getTable_name()));
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTable(ddlRequest.getCreate_table_params(), response, syncDdl,
-              wantMinimalResult);
+          createTable(ddlRequest.getCreate_table_params(), response, catalogTimeline,
+              syncDdl, wantMinimalResult);
           break;
         case CREATE_TABLE_LIKE:
           TCreateTableLikeParams create_table_like_params =
               ddlRequest.getCreate_table_like_params();
           tTableName = Optional.of(create_table_like_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTableLike(create_table_like_params, response, syncDdl, wantMinimalResult);
+          createTableLike(create_table_like_params, response, catalogTimeline, syncDdl,
+              wantMinimalResult);
           break;
         case CREATE_VIEW:
           TCreateOrAlterViewParams create_view_params =
               ddlRequest.getCreate_view_params();
           tTableName = Optional.of(create_view_params.getView_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createView(create_view_params, wantMinimalResult, response);
+          createView(create_view_params, wantMinimalResult, response, catalogTimeline);
           break;
         case CREATE_FUNCTION:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -559,6 +584,7 @@ public class CatalogOpExecutor {
           throw new IllegalStateException(
               "Unexpected DDL exec request type: " + ddl_type);
       }
+      catalogTimeline.markEvent(DDL_FINISHED);
 
       // If SYNC_DDL is set, set the catalog update that contains the results of this DDL
       // operation. The version of this catalog update is returned to the requesting
@@ -567,8 +593,12 @@ public class CatalogOpExecutor {
       if (syncDdl) {
         response.getResult().setVersion(
             catalog_.waitForSyncDdlVersion(response.getResult()));
+        catalogTimeline.markEvent(SENT_CATALOG_FOR_SYNC_DDL);
       }
 
+      TRuntimeProfileNode profile = Frontend.createTRuntimeProfileNode("CatalogOp");
+      profile.addToEvent_sequences(catalogTimeline.toThrift());
+      response.setProfile(profile);
       // At this point, the operation is considered successful. If any errors occurred
       // during execution, this function will throw an exception and the CatalogServer
       // will handle setting a bad status code.
@@ -2054,9 +2084,22 @@ public class CatalogOpExecutor {
    */
   private List<NotificationEvent> getNextMetastoreEventsIfEnabled(long eventId,
       NotificationFilter eventsFilter) throws MetastoreNotificationException {
+    return getNextMetastoreEventsIfEnabled(null, eventId, eventsFilter);
+  }
+
+  /**
+   * Same as the above but also update the given 'catalogTimeline'.
+   */
+  private List<NotificationEvent> getNextMetastoreEventsIfEnabled(
+      EventSequence catalogTimeline, long eventId, NotificationFilter eventsFilter)
+      throws MetastoreNotificationException {
     if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
-    return MetastoreEventsProcessor
+    List<NotificationEvent> events = MetastoreEventsProcessor
         .getNextMetastoreEventsInBatches(catalog_, eventId, eventsFilter);
+    if (catalogTimeline != null) {
+      catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
+    }
+    return events;
   }
 
   /**
@@ -2230,8 +2273,20 @@ public class CatalogOpExecutor {
    * Returns the latest notification event id from the Hive metastore.
    */
   private long getCurrentEventId(MetaStoreClient msClient) throws ImpalaRuntimeException {
+    return getCurrentEventId(msClient, null);
+  }
+
+  /**
+   * Same as the above but also update the given 'catalogTimeline'.
+   */
+  private long getCurrentEventId(MetaStoreClient msClient, EventSequence catalogTimeline)
+      throws ImpalaRuntimeException {
     try {
-      return msClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+      long id = msClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+      if (catalogTimeline != null) {
+        catalogTimeline.markEvent(FETCHED_LATEST_HMS_EVENT_ID + id);
+      }
+      return id;
     } catch (TException e) {
       throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
           "getCurrentNotificationEventId") + e
@@ -3290,7 +3345,8 @@ public class CatalogOpExecutor {
    * otherwise.
    */
   private boolean createTable(TCreateTableParams params, TDdlExecResponse response,
-      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
@@ -3306,6 +3362,7 @@ public class CatalogOpExecutor {
       LOG.trace("Skipping table creation because {} already exists and " +
           "IF NOT EXISTS was specified.", tableName);
       tryWriteLock(existingTbl);
+      catalogTimeline.markEvent(GOT_TABLE_WRITE_LOCK);
       try {
         if (syncDdl) {
           // When SYNC_DDL is enabled and the table already exists, we force a version
@@ -3334,18 +3391,18 @@ public class CatalogOpExecutor {
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.trace("Creating table {}", tableName);
     if (KuduTable.isKuduTable(tbl)) {
-      return createKuduTable(tbl, params, wantMinimalResult, response);
+      return createKuduTable(tbl, params, wantMinimalResult, response, catalogTimeline);
     } else if (IcebergTable.isIcebergTable(tbl)) {
-      return createIcebergTable(tbl, wantMinimalResult, response, params.if_not_exists,
-          params.getColumns(), params.getPartition_spec(), params.getTable_properties(),
-          params.getComment());
+      return createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
+          params.if_not_exists, params.getColumns(), params.getPartition_spec(),
+          params.getTable_properties(), params.getComment());
     }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
     MetastoreShim.setTableLocation(catalog_.getDb(tbl.getDbName()), tbl);
     return createTable(tbl, params.if_not_exists, params.getCache_op(),
         params.server_name, params.getPrimary_keys(), params.getForeign_keys(),
-        wantMinimalResult, response);
+        wantMinimalResult, response, catalogTimeline);
   }
 
   /**
@@ -3453,42 +3510,44 @@ public class CatalogOpExecutor {
    * table was created as part of this call, false otherwise.
    */
   private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse response)
-      throws ImpalaException {
+      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse response,
+      EventSequence catalogTimeline) throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
     boolean createHMSTable;
     long eventId;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      eventId = getCurrentEventId(msClient);
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
+      eventId = getCurrentEventId(msClient, catalogTimeline);
     }
     if (!KuduTable.isSynchronizedTable(newTable)) {
       // if this is not a synchronized table, we assume that the table must be existing
       // in kudu and use the column spec from Kudu
-      KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
+      KuduCatalogOpExecutor.populateExternalTableColsFromKudu(catalogTimeline, newTable);
       createHMSTable = true;
     } else {
       // if this is a synchronized table (managed or external.purge table) then we
       // create it in Kudu first
-      KuduCatalogOpExecutor.createSynchronizedTable(newTable, params);
+      KuduCatalogOpExecutor.createSynchronizedTable(catalogTimeline, newTable, params);
       createHMSTable = !isKuduHmsIntegrationEnabled(newTable);
     }
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
-      List<NotificationEvent> events = Collections.EMPTY_LIST;
-      getMetastoreDdlLock().lock();
+      List<NotificationEvent> events = Collections.emptyList();
+      acquireMetastoreDdlLock(catalogTimeline);
       if (createHMSTable) {
-        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
           boolean tableInMetastore =
               msClient.getHiveClient().tableExists(newTable.getDbName(),
                                                    newTable.getTableName());
+          catalogTimeline.markEvent(CHECKED_HMS_TABLE_EXISTENCE);
           if (!tableInMetastore) {
             msClient.getHiveClient().createTable(newTable);
+            catalogTimeline.markEvent(CREATED_HMS_TABLE);
           } else {
             addSummary(response, "Table already exists.");
             return false;
           }
-          events = getNextMetastoreEventsIfEnabled(eventId,
+          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId,
                   event -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
                       .equals(event.getEventType())
                       && newTable.getDbName().equalsIgnoreCase(event.getDbName())
@@ -3505,12 +3564,13 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTable =
           eventTblPair == null ? null : eventTblPair.second;
       setTableNameAndCreateTimeInResponse(msTable,
-          newTable.getDbName(), newTable.getTableName(), response);
+          newTable.getDbName(), newTable.getTableName(), response, catalogTimeline);
 
       // Add the table to the catalog cache
       Table newTbl = catalog_
           .addIncompleteTable(newTable.getDbName(), newTable.getTableName(),
               TImpalaTableType.TABLE, params.getComment(), createEventId);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created a Kudu table {} with create event id {}", newTbl.getFullName(),
           createEventId);
       addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
@@ -3551,26 +3611,28 @@ public class CatalogOpExecutor {
   private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
-      boolean wantMinimalResult, TDdlExecResponse response) throws ImpalaException {
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline)
+      throws ImpalaException {
     Preconditions.checkState(!KuduTable.isKuduTable(newTable));
-    getMetastoreDdlLock().lock();
+    acquireMetastoreDdlLock(catalogTimeline);
     try {
       org.apache.hadoop.hive.metastore.api.Table msTable;
       Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventIdTblPair = null;
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        long eventId = getCurrentEventId(msClient);
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
+        long eventId = getCurrentEventId(msClient, catalogTimeline);
         if (primaryKeys == null && foreignKeys == null) {
           msClient.getHiveClient().createTable(newTable);
         } else {
           MetastoreShim.createTableWithConstraints(
               msClient.getHiveClient(), newTable,
-              primaryKeys == null ? new ArrayList<>() : primaryKeys,
-              foreignKeys == null ? new ArrayList<>() : foreignKeys);
+              primaryKeys == null ? Collections.emptyList() : primaryKeys,
+              foreignKeys == null ? Collections.emptyList() : foreignKeys);
         }
-
+        catalogTimeline.markEvent(CREATED_HMS_TABLE);
         addSummary(response, "Table has been created.");
         final org.apache.hadoop.hive.metastore.api.Table finalNewTable = newTable;
-        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(eventId,
+        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
+            catalogTimeline, eventId,
             notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
                 .equals(notificationEvent.getEventType())
                 && finalNewTable.getDbName()
@@ -3583,10 +3645,11 @@ public class CatalogOpExecutor {
           // not atomic.
           eventIdTblPair = new Pair<>(-1L, msClient.getHiveClient()
               .getTable(newTable.getDbName(), newTable.getTableName()));
+          catalogTimeline.markEvent(FETCHED_HMS_TABLE);
         }
         msTable = eventIdTblPair.second;
         setTableNameAndCreateTimeInResponse(msTable, newTable.getDbName(),
-            newTable.getTableName(), response);
+            newTable.getTableName(), response, catalogTimeline);
         // For external tables set table location needed for lineage generation.
         if (newTable.getTableType() == TableType.EXTERNAL_TABLE.toString()) {
           String tableLocation = newTable.getSd().getLocation();
@@ -3611,6 +3674,7 @@ public class CatalogOpExecutor {
           MetadataOp.getTableComment(msTable),
           eventIdTblPair.first);
       Preconditions.checkNotNull(newTbl);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created catalog table {} with create event id {}", newTbl.getFullName(),
           eventIdTblPair.first);
       // Submit the cache request and update the table metadata.
@@ -3651,10 +3715,12 @@ public class CatalogOpExecutor {
    */
   private void setTableNameAndCreateTimeInResponse(
       org.apache.hadoop.hive.metastore.api.Table msTable, String dbName, String tblName,
-      TDdlExecResponse response) throws org.apache.thrift.TException {
+      TDdlExecResponse response, EventSequence catalogTimeline)
+      throws org.apache.thrift.TException {
     if (msTable == null) {
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
         msTable = msClient.getHiveClient().getTable(dbName, tblName);
+        catalogTimeline.markEvent(FETCHED_HMS_TABLE + " to get create time");
       }
     }
     response.setTable_name(dbName + "." + tblName);
@@ -3667,9 +3733,9 @@ public class CatalogOpExecutor {
    * exceptions encountered during the create.
    */
   private void createView(TCreateOrAlterViewParams params, boolean wantMinimalResult,
-      TDdlExecResponse response) throws ImpalaException {
+      TDdlExecResponse response, EventSequence catalogTimeline) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getView_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
+    Preconditions.checkState(tableName.isFullyQualified());
     Preconditions.checkState(params.getColumns() != null &&
         params.getColumns().size() > 0,
           "Null or empty column list given as argument to DdlExecutor.createView");
@@ -3687,8 +3753,9 @@ public class CatalogOpExecutor {
         new org.apache.hadoop.hive.metastore.api.Table();
     setCreateViewAttributes(params, view);
     LOG.trace(String.format("Creating view %s", tableName));
-    if (!createTable(view, params.if_not_exists, null, params.server_name,
-        new ArrayList<>(), new ArrayList<>(), wantMinimalResult, response)) {
+    if (!createTable(view, params.if_not_exists, /*cacheOp*/null, params.server_name,
+        /*primaryKeys*/null, /*foreignKeys*/null, wantMinimalResult, response,
+        catalogTimeline)) {
       addSummary(response, "View already exists.");
     } else {
       addSummary(response, "View has been created.");
@@ -3699,8 +3766,8 @@ public class CatalogOpExecutor {
    * Creates a new Iceberg table.
    */
   private boolean createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      boolean wantMinimalResult, TDdlExecResponse response, boolean ifNotExists,
-      List<TColumn> columns, TIcebergPartitionSpec partitionSpec,
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline,
+      boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec partitionSpec,
       Map<String, String> tableProperties, String tblComment) throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
@@ -3708,13 +3775,14 @@ public class CatalogOpExecutor {
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
-      List<NotificationEvent> events = Collections.emptyList();
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      List<NotificationEvent> events;
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
         boolean tableInMetastore =
             msClient.getHiveClient().tableExists(newTable.getDbName(),
                 newTable.getTableName());
+        catalogTimeline.markEvent(CHECKED_HMS_TABLE_EXISTENCE);
         if (!tableInMetastore) {
-          long eventId = getCurrentEventId(msClient);
+          long eventId = getCurrentEventId(msClient, catalogTimeline);
           TIcebergCatalog catalog = IcebergUtil.getTIcebergCatalog(newTable);
           String location = newTable.getSd().getLocation();
           //Create table in iceberg if necessary
@@ -3737,6 +3805,7 @@ public class CatalogOpExecutor {
                 IcebergUtil.getIcebergTableIdentifier(newTable), location, columns,
                 partitionSpec, tableProperties).location();
             newTable.getSd().setLocation(tableLoc);
+            catalogTimeline.markEvent(CREATED_ICEBERG_TABLE + catalog.name());
           } else {
             // If this is not a synchronized table, we assume that the table must be
             // existing in an Iceberg Catalog.
@@ -3758,6 +3827,7 @@ public class CatalogOpExecutor {
             TableIdentifier identifier = IcebergUtil.getIcebergTableIdentifier(newTable);
             org.apache.iceberg.Table iceTable = IcebergUtil.loadTable(
                 catalog, identifier, locationToLoadFrom, newTable.getParameters());
+            catalogTimeline.markEvent(LOADED_ICEBERG_TABLE);
             // Populate the HMS table schema based on the Iceberg table's schema because
             // the Iceberg metadata is the source of truth. This also avoids an
             // unnecessary ALTER TABLE.
@@ -3776,8 +3846,9 @@ public class CatalogOpExecutor {
               newTable.getPartitionKeys().isEmpty());
           if (!isIcebergHmsIntegrationEnabled(newTable)) {
             msClient.getHiveClient().createTable(newTable);
+            catalogTimeline.markEvent(CREATED_HMS_TABLE);
           }
-          events = getNextMetastoreEventsIfEnabled(eventId, event ->
+          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, event ->
               CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType())
                   && newTable.getDbName().equalsIgnoreCase(event.getDbName())
                   && newTable.getTableName().equalsIgnoreCase(event.getTableName()));
@@ -3792,11 +3863,12 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTable =
           eventTblPair == null ? null : eventTblPair.second;
       setTableNameAndCreateTimeInResponse(msTable,
-          newTable.getDbName(), newTable.getTableName(), response);
+          newTable.getDbName(), newTable.getTableName(), response, catalogTimeline);
       // Add the table to the catalog cache
       Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
           newTable.getTableName(), TImpalaTableType.TABLE, tblComment,
           createEventId);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created an iceberg table {} in catalog with create event Id {} ",
           newTbl.getFullName(), createEventId);
       addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
@@ -3808,6 +3880,7 @@ public class CatalogOpExecutor {
         // extra "ALTER TABLE SET OWNER" step is required.
         setIcebergTableOwnerAfterCreateTable(newTable.getDbName(),
             newTable.getTableName(), newTable.getOwner());
+        catalogTimeline.markEvent(SET_ICEBERG_TABLE_OWNER);
         LOG.debug("Table owner has been changed to " + newTable.getOwner());
       } catch (Exception e) {
         LOG.warn("Failed to set table owner after creating " +
@@ -3852,7 +3925,8 @@ public class CatalogOpExecutor {
    * @param  syncDdl tells is SYNC_DDL is enabled for this DDL request.
    */
   private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response,
-      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkNotNull(params);
     THdfsFileFormat fileFormat =
         params.isSetFile_format() ? params.getFile_format() : null;
@@ -3985,16 +4059,18 @@ public class CatalogOpExecutor {
       for (Column col: srcIceTable.getColumns()) columns.add(col.toThrift());
       TIcebergPartitionSpec partitionSpec = srcIceTable.getDefaultPartitionSpec()
           .toThrift();
-      createIcebergTable(tbl, wantMinimalResult, response, params.if_not_exists, columns,
-          partitionSpec, tableProperties, params.getComment());
+      createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
+          params.if_not_exists, columns, partitionSpec, tableProperties,
+          params.getComment());
     } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) {
       TCreateTableParams createTableParams =
           extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, tbl);
-      createKuduTable(tbl, createTableParams, wantMinimalResult, response);
+      createKuduTable(tbl, createTableParams, wantMinimalResult, response,
+          catalogTimeline);
     } else {
       MetastoreShim.setTableLocation(catalog_.getDb(tbl.getDbName()), tbl);
       createTable(tbl, params.if_not_exists, null, params.server_name, null, null,
-          wantMinimalResult, response);
+          wantMinimalResult, response, catalogTimeline);
     }
   }
 
@@ -4870,6 +4946,14 @@ public class CatalogOpExecutor {
     return metastoreDdlLock_;
   }
 
+  /**
+   * Acquires 'metastoreDdlLock_' and also updates 'catalogTimeline' when it's done.
+   */
+  private void acquireMetastoreDdlLock(EventSequence catalogTimeline) {
+    metastoreDdlLock_.lock();
+    catalogTimeline.markEvent(GOT_METASTORE_DDL_LOCK);
+  }
+
   /**
    * Adds partitions in 'allHmsPartitionsToAdd' in batches via 'msClient'.
    * Returns the created partitions.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b2124a1a4..794a65a79 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2287,7 +2287,7 @@ public class Frontend {
     }
   }
 
-  private static TRuntimeProfileNode createTRuntimeProfileNode(
+  public static TRuntimeProfileNode createTRuntimeProfileNode(
       String childrenProfileName) {
     return new TRuntimeProfileNode(childrenProfileName,
         /*num_children=*/0,
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 901bf3da8..738e21fbf 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -41,6 +41,7 @@ import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TKuduPartitionByHashParam;
 import org.apache.impala.thrift.TRangePartition;
 import org.apache.impala.thrift.TRangePartitionOperationType;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
@@ -70,16 +71,59 @@ import com.google.common.collect.Sets;
 public class KuduCatalogOpExecutor {
   public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
 
+  // Labels used in catalog timeline
+  private static final String CHECKED_KUDU_TABLE_EXISTENCE =
+      "Checked table existence in Kudu";
+  private static final String CREATED_KUDU_TABLE = "Created table in Kudu";
+  private static final String GOT_KUDU_CLIENT = "Got Kudu client";
+  private static final String GOT_KUDU_DDL_LOCK = "Got kuduDdlLock";
+  private static final String OPENED_KUDU_TABLE = "Opened Kudu table";
+  private static final String POPULATED_COLS_FROM_KUDU =
+      "Populated external table cols from Kudu";
+
   private static final Object kuduDdlLock_ = new Object();
 
+  /**
+   * Wrapper to get kudu client and mark the given 'catalogTimeline' when it finishes.
+   */
+  private static KuduClient getKuduClient(String masterHosts,
+      EventSequence catalogTimeline) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    catalogTimeline.markEvent(GOT_KUDU_CLIENT);
+    return kudu;
+  }
+
+  /**
+   * Wrapper to check kudu table existence and mark the given 'catalogTimeline' when it
+   * finishes.
+   */
+  private static boolean checkTableExistence(KuduClient client, String kuduTableName,
+      EventSequence catalogTimeline) throws KuduException {
+    boolean tableExists = client.tableExists(kuduTableName);
+    catalogTimeline.markEvent(CHECKED_KUDU_TABLE_EXISTENCE);
+    return tableExists;
+  }
+
+  /**
+   * Wrapper to create the kudu table and mark the given 'catalogTimeline' when it
+   * finishes.
+   */
+  private static org.apache.kudu.client.KuduTable createKuduTable(KuduClient client,
+      String name, Schema schema, CreateTableOptions tableOpts,
+      EventSequence catalogTimeline) throws KuduException {
+    org.apache.kudu.client.KuduTable table = client.createTable(name, schema, tableOpts);
+    catalogTimeline.markEvent(CREATED_KUDU_TABLE);
+    return table;
+  }
+
   /**
    * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
    * be created in Kudu.
    */
-  public static void createSynchronizedTable(
-          org.apache.hadoop.hive.metastore.api.Table msTbl,
-          TCreateTableParams params) throws ImpalaRuntimeException {
+  public static void createSynchronizedTable(EventSequence catalogTimeline,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TCreateTableParams params) throws ImpalaRuntimeException {
     Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
     Preconditions.checkState(
         msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
@@ -89,13 +133,14 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName,
           masterHosts));
     }
-    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    KuduClient kudu = getKuduClient(masterHosts, catalogTimeline);
     try {
       // Acquire lock to protect table existence check and table creation, see IMPALA-8984
       synchronized (kuduDdlLock_) {
+        catalogTimeline.markEvent(GOT_KUDU_DDL_LOCK);
         // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
         // (see KUDU-1710).
-        boolean tableExists = kudu.tableExists(kuduTableName);
+        boolean tableExists = checkTableExistence(kudu, kuduTableName, catalogTimeline);
         if (tableExists && params.if_not_exists) return;
 
         // if table is managed or external with external.purge.table = true in
@@ -107,8 +152,8 @@ public class KuduCatalogOpExecutor {
         Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
         Schema schema = createTableSchema(params);
         CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
-        org.apache.kudu.client.KuduTable table =
-            kudu.createTable(kuduTableName, schema, tableOpts);
+        org.apache.kudu.client.KuduTable table = createKuduTable(
+            kudu, kuduTableName, schema, tableOpts, catalogTimeline);
         // Populate table ID from Kudu table if Kudu's integration with the Hive
         // Metastore is enabled.
         if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
@@ -312,7 +357,7 @@ public class KuduCatalogOpExecutor {
    * an equivalent schema for external tables. Throws an exception if any errors
    * are encountered.
    */
-  public static void populateExternalTableColsFromKudu(
+  public static void populateExternalTableColsFromKudu(EventSequence catalogTimeline,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
     org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
     List<FieldSchema> cols = msTblCopy.getSd().getCols();
@@ -327,13 +372,14 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Loading schema of table '%s' from master '%s'",
           kuduTableName, masterHosts));
     }
-    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    KuduClient kudu = getKuduClient(masterHosts, catalogTimeline);
     try {
-      if (!kudu.tableExists(kuduTableName)) {
+      if (!checkTableExistence(kudu, kuduTableName, catalogTimeline)) {
         throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
             "'%s'", kuduTableName));
       }
       org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName);
+      catalogTimeline.markEvent(OPENED_KUDU_TABLE);
       // Replace the columns in the Metastore table with the columns from the recently
       // accessed Kudu schema.
       cols.clear();
@@ -358,6 +404,7 @@ public class KuduCatalogOpExecutor {
     List<FieldSchema> newCols = msTbl.getSd().getCols();
     newCols.clear();
     newCols.addAll(cols);
+    catalogTimeline.markEvent(POPULATED_COLS_FROM_KUDU);
   }
 
   /**
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 175b56568..d408b10ee 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -440,6 +440,14 @@ class TestObservability(ImpalaTestSuite):
       r'Query Timeline:',
       r'Planning finished'
     ]
+    # Events for DDLs. Currently just CreateTable has them.
+    ddl_event_regexes = [
+      r'Catalog Server Operation:',
+      r'Got metastoreDdlLock:',
+      r'Created table in Metastore:',
+      r'Created table in catalog cache:',
+      r'DDL finished:',
+    ]
     # queries that explore different code paths in Frontend compilation
     queries = [
       'create table if not exists impala_6568 (i int)',
@@ -454,6 +462,10 @@ class TestObservability(ImpalaTestSuite):
       runtime_profile = self.execute_query(query).runtime_profile
       # and check that all the expected events appear in the resulting profile
       self.__verify_profile_contains_every_event(event_regexes, runtime_profile, query)
+      # Verify catalogOp timeline
+      if query.startswith("create table"):
+        self.__verify_profile_contains_every_event(
+            ddl_event_regexes, runtime_profile, query)
 
   def __verify_profile_contains_every_event(self, event_regexes, runtime_profile, query):
     """Test that all the expected events show up in a given query profile."""
@@ -462,6 +474,54 @@ class TestObservability(ImpalaTestSuite):
           "Didn't find event '" + regex + "' for query '" + query + \
           "' in profile: \n" + runtime_profile
 
+  def test_create_table_profile_events(self, unique_database):
+    # Create normal table
+    stmt = "create table %s.t1(id int)" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got metastoreDdlLock",
+        "Got Metastore client",
+        "Got current Metastore event id",
+        "Created table in Metastore",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "DDL finished"
+    ])
+    # Create Kudu table
+    stmt = "create table %s.t2(id int, name string, primary key(id))" \
+           " partition by hash(id) partitions 3 stored as kudu" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got Metastore client",
+        "Got current Metastore event id",
+        "Got Kudu client",
+        "Got kuduDdlLock",
+        "Checked table existence in Kudu",
+        "Created table in Kudu",
+        "Got metastoreDdlLock",
+        "Got Metastore client",
+        "Checked table existence in Metastore",
+        "Created table in Metastore",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "DDL finished",
+    ])
+    # Create Iceberg table
+    stmt = "create table %s.t3(id int) stored as iceberg" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got Metastore client",
+        "Checked table existence in Metastore",
+        "Got current Metastore event id",
+        "Created table using Iceberg Catalog HIVE_CATALOG",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "Set Iceberg table owner in Metastore",
+        "DDL finished",
+    ])
+
+  def __verify_event_labels_in_profile(self, stmt, event_labels):
+    profile = self.execute_query(stmt).runtime_profile
+    for label in event_labels:
+      assert label in profile
+
   def test_compute_stats_profile(self, unique_database):
     """Test that the profile for a 'compute stats' query contains three unique query ids:
     one for the parent 'compute stats' query and one each for the two child queries."""


[impala] 02/02: IMPALA-11535: Skip older events in the event processor based on the latestRefreshEventID

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b718d63860356a04814e07d91711c3c748b3e769
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Wed Jun 7 16:51:15 2023 -0700

    IMPALA-11535: Skip older events in the event processor based on the
    latestRefreshEventID
    
    Summary: If the table has been manually refreshed, all its events
    happen before the manual REFRESH can be skipped. This happens when
    catalogd is lagging behind in processing events. When processing an
    event, we can check whether there are manual REFRESH executed after
    its eventTime. In such case, we don't need to process the event to
    refresh anything. This helps catalogd to catch up HMS events quickly.
    
    Implementation details: Updated the lastRefreshEventId on the table or
    partition whenever there is table or partition level refresh/load.
    By comparing the lastRefreshEventId to current event id in the event
    processor the older events can be skipped.
    
    set enable_skipping_older_events to true to enable this optimization
    
    Testing:
    - Unit end-to-end test and unit test to test the functionality.
    
    Change-Id: Ic0dc5c7396d80616680d8a5805ce80db293b72e1
    Reviewed-on: http://gerrit.cloudera.org:8080/20022
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 10 ++++
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../java/org/apache/impala/catalog/HdfsTable.java  | 17 +++---
 .../main/java/org/apache/impala/catalog/Table.java |  2 +-
 .../impala/catalog/events/MetastoreEvents.java     | 57 ++++++++++++++++++-
 .../catalog/events/MetastoreEventsProcessor.java   | 11 ++++
 .../org/apache/impala/service/BackendConfig.java   |  9 +++
 .../apache/impala/service/CatalogOpExecutor.java   | 18 ++++++
 .../events/MetastoreEventsProcessorTest.java       | 44 +++++++++++++++
 tests/custom_cluster/test_events_custom_configs.py | 64 ++++++++++++++++++++++
 11 files changed, 227 insertions(+), 9 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6d2b47c96..bcbd58e1e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -138,6 +138,16 @@ DEFINE_string(file_metadata_reload_properties, "EXTERNAL, metadata_location,"
     "refresh file metadata when these properties are changed. To skip this optimization,"
     "set the value to empty string");
 
+DEFINE_bool(enable_skipping_older_events, false, "This configuration is used to skip any"
+    "older events in the event processor based on the lastRefreshEventId on the"
+    "database/table/partition in the cache. All the DML queries that change the metadata"
+    "in the catalogD's cache will update the lastRefreshEventId i.e.., fetch the latest"
+    "available event on HMS and set it on the object. In case the event processor is"
+    "lagging, the older events in event processor queue can be skipped by comparing the"
+    "current event id to that of lastRefreshEventId. The default is set to false to"
+    "disable the optimisation. Set this true to enable skipping the older events and"
+    "quickly catch with the events of HMS");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index c10f472e4..ae9d61ba9 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -108,6 +108,7 @@ DECLARE_int32(thrift_rpc_max_message_size);
 DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
 DECLARE_int32(iceberg_reload_new_files_threshold);
+DECLARE_bool(enable_skipping_older_events);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -423,6 +424,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
   cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
   cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
+  cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 6cee3ab7a..3943021df 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -260,4 +260,6 @@ struct TBackendGflags {
   114: required bool use_jamm_weigher
 
   115: required i32 iceberg_reload_new_files_threshold
+
+  116: required bool enable_skipping_older_events;
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0d8b6ba80..70ede2d5e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -56,6 +56,7 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
@@ -1927,6 +1928,7 @@ public class HdfsTable extends Table implements FeFsTable {
       }
       partBuilders.add(partBuilder);
     }
+    long latestEventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(client);
     long fileMdLoadTime = loadFileMetadataForPartitions(client, partBuilders,
         /* isRefresh=*/false);
     for (HdfsPartition.Builder p : partBuilders) {
@@ -1935,7 +1937,12 @@ public class HdfsTable extends Table implements FeFsTable {
       } else {
         updatePartition(p);
       }
+      if (latestEventId > -1) {
+        p.setLastRefreshEventId(latestEventId);
+      }
     }
+    LOG.info("Setting the latest refresh event id to {} for the loaded partitions for "
+        + "the table {}", latestEventId, getFullName());
     return fileMdLoadTime;
   }
 
@@ -2895,13 +2902,7 @@ public class HdfsTable extends Table implements FeFsTable {
     FsPermissionCache permissionCache = new FsPermissionCache();
     Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
     Set<HdfsPartition.Builder> partBuildersFileMetadataRefresh = new HashSet<>();
-    long latestEventId = -1L;
-    try {
-      latestEventId = client.getCurrentNotificationEventId().getEventId();
-    } catch (TException exception) {
-      LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
-          exception.getMessage()));
-    }
+    long latestEventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(client);
     for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
       Partition hmsPartition = entry.getKey();
       HdfsPartition oldPartition = entry.getValue();
@@ -2935,6 +2936,8 @@ public class HdfsTable extends Table implements FeFsTable {
       }
       partBuilderToPartitions.put(partBuilder, oldPartition);
     }
+    LOG.info("Setting the latest refresh event id to {} for the reloaded {} partitions",
+        latestEventId, partBuilderToPartitions.size());
     if (!partBuildersFileMetadataRefresh.isEmpty()) {
       LOG.info("for table {}, file metadataOps: {}, refreshing file metadata for {}"
               + " out of {} partitions to reload in reloadPartitions()", getFullName(),
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2b70bdeeb..2364f82f9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -1048,7 +1048,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     if (eventId > lastRefreshEventId_) {
       lastRefreshEventId_ = eventId;
     }
-    LOG.debug("last refreshed event id for table: {} set to: {}", getFullName(),
+    LOG.info("last refreshed event id for table: {} set to: {}", getFullName(),
         lastRefreshEventId_);
     // TODO: Should we reset lastSyncedEvent Id if it is less than event Id?
     // If we don't reset it - we may start syncing table from an event id which
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index e26acaff4..593a8de79 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1036,6 +1036,38 @@ public class MetastoreEvents {
       }
       return true;
     }
+
+    protected boolean isOlderEvent(Partition partitionEventObj) {
+      if (!BackendConfig.INSTANCE.enableSkippingOlderEvents()) {
+        return false;
+      }
+      org.apache.impala.catalog.Table tbl = null;
+      try {
+        tbl = catalog_.getTable(dbName_, tblName_);
+        if (tbl == null || tbl instanceof IncompleteTable) {
+          return false;
+        }
+        // Always check the lastRefreshEventId on the table first for table level refresh
+        if (tbl.getLastRefreshEventId() > getEventId() || (partitionEventObj != null &&
+            catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
+                partitionEventObj, getEventId()))) {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(getNumberOfEvents());
+          String messageStr = partitionEventObj == null ? "Skipping the event since the" +
+              " table " + dbName_+ "." + tblName_ + " has last refresh id as " +
+              tbl.getLastRefreshEventId() + ". Comparing it with current event " +
+              getEventId() + ". " : "";
+          infoLog("{}Incremented events skipped counter to {}", messageStr,
+              metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                  .getCount());
+          return true;
+        }
+      } catch (CatalogException e) {
+        debugLog("ignoring exception while checking if it is an older event "
+            + "on table {}.{}", dbName_, tblName_, e);
+      }
+      return false;
+    }
   }
 
   /**
@@ -1293,7 +1325,13 @@ public class MetastoreEvents {
     @Override
     public void process() throws MetastoreNotificationException {
       if (isSelfEvent()) {
-        infoLog("Not processing the event as it is a self-event");
+        infoLog("Not processing the insert event as it is a self-event");
+        return;
+      }
+
+      if (isOlderEvent(insertPartition_)) {
+        infoLog("Not processing the insert event {} as it is an older event",
+            getEventId());
         return;
       }
       // Reload the whole table if it's a transactional table or materialized view.
@@ -1458,6 +1496,12 @@ public class MetastoreEvents {
         return;
       }
 
+      if (isOlderEvent(null)) {
+        infoLog("Not processing the alter table event {} as it is an older event",
+            getEventId());
+        return;
+      }
+
       // Determine whether this is an event which we have already seen or if it is a new
       // event
       if (isSelfEvent()) {
@@ -2153,6 +2197,12 @@ public class MetastoreEvents {
         return;
       }
 
+      if (isOlderEvent(partitionBefore_)) {
+        infoLog("Not processing the alter partition event {} as it is an older event",
+            getEventId());
+        return;
+      }
+
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       if (canBeSkipped()) {
@@ -2291,6 +2341,11 @@ public class MetastoreEvents {
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
       for (T event : batchedEvents_) {
+        if (isOlderEvent(event.getPartitionForBatching())) {
+          infoLog("Not processing the current event id {} as it is an older event",
+              event.getEventId());
+          continue;
+        }
         if (!event.canBeSkipped()) {
           eventsToProcess.add(event);
         }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 19b7ddc36..c392ef980 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -715,6 +715,17 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     }
   }
 
+  public static long getCurrentEventIdNoThrow(IMetaStoreClient client) {
+    long latestEventId = -1L;
+    try {
+      latestEventId = client.getCurrentNotificationEventId().getEventId();
+    } catch (TException exception) {
+      LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
+          exception.getMessage()));
+    }
+    return latestEventId;
+  }
+
   /**
    * Starts the event processor from a given event id
    */
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 29352d908..f5c7570bf 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -365,6 +365,15 @@ public class BackendConfig {
     backendCfg_.enable_reload_events = flag;
   }
 
+  public boolean enableSkippingOlderEvents() {
+    return backendCfg_.enable_skipping_older_events;
+  }
+
+  @VisibleForTesting
+  public void setSkippingOlderEvents(boolean flag) {
+    backendCfg_.enable_skipping_older_events = flag;
+  }
+
   public boolean pullTableTypesAndComments() {
     return backendCfg_.pull_table_types_and_comments;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 13d5e4161..256666d1b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1551,9 +1551,14 @@ public class CatalogOpExecutor {
       String reason, @Nullable String debugAction)
       throws CatalogException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
+    long eventId = -1L;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
+      if (msTbl.getPartitionKeysSize() == 0) {
+        eventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(
+            msClient.getHiveClient());
+      }
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
             reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate,
@@ -1561,6 +1566,12 @@ public class CatalogOpExecutor {
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
+      // Update the lastRefreshEventId at the table level if it is unpartitioned table
+      // if it is partitioned table, partitions are updated in HdfsTable#load() method
+      if (msTbl.getPartitionKeysSize() == 0 && eventId > tbl.getLastRefreshEventId()
+          && reloadFileMetadata && reloadTableSchema) {
+        tbl.setLastRefreshEventId(eventId);
+      }
     }
     tbl.setCatalogVersion(newCatalogVersion);
   }
@@ -1635,11 +1646,18 @@ public class CatalogOpExecutor {
         LOG.trace(String.format("Altering view %s", tableName));
       }
       applyAlterTable(msTbl);
+      long eventId = -1L;
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        eventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(
+            msClient.getHiveClient());
         tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
       }
       addSummary(resp, "View has been altered.");
       tbl.setCatalogVersion(newCatalogVersion);
+      // Update the last refresh event id at table level
+      if (eventId > tbl.getLastRefreshEventId()) {
+        tbl.setLastRefreshEventId(eventId);
+      }
       addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index e71e3afe6..bcf75c16d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -3207,6 +3207,50 @@ public class MetastoreEventsProcessorTest {
         catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals2));
   }
 
+  /**
+   * Test verifies if lastRefreshEventId is updated or not after a table is refreshed
+   * This is useful for skipping older events in the event processor
+   * @throws Exception
+   */
+  @Test
+  public void testSkippingOlderEvents() throws Exception {
+    BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(true);
+    BackendConfig.INSTANCE.setSkippingOlderEvents(true);
+    createDatabase(TEST_DB_NAME, null);
+    final String testTblName = "testSkippingOlderEvents";
+    createTable(testTblName, true);
+    eventsProcessor_.processEvents();
+    AlterTableExecutor hiveExecutor = new HiveAlterTableExecutor(TEST_DB_NAME,
+        testTblName);
+    hiveExecutor.execute();
+    HdfsTable testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testTblName,
+        "test", null);
+    long lastSyncEventIdBefore = testTbl.getLastRefreshEventId();
+    alterTableAddParameter(testTblName, "somekey", "someval");
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    assertTrue(testTbl.getLastRefreshEventId() > lastSyncEventIdBefore);
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+    final String testUnpartTblName = "testUnPartSkippingOlderEvents";
+    createTable(testUnpartTblName, false);
+    testInsertEvents(TEST_DB_NAME, testUnpartTblName, false);
+    testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testUnpartTblName,
+        "test", null);
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    confirmTableIsLoaded(TEST_DB_NAME, testUnpartTblName);
+    // Verify older HMS events are skipped by doing refresh in Impala
+    alterTableAddCol(testUnpartTblName, "newCol", "string", "test new column");
+    testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testUnpartTblName,
+        "test", null);
+    lastSyncEventIdBefore = testTbl.getLastRefreshEventId();
+    catalog_.reloadTable(testTbl, "test");
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    assertTrue(testTbl.getLastRefreshEventId() > lastSyncEventIdBefore);
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 43d77bac9..e37c45ae6 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -311,6 +311,70 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     EventProcessorUtils.wait_for_event_processing(self)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=5"
+                  " --enable_skipping_older_events=true"
+                  " --enable_sync_to_latest_event_on_ddls=true")
+  def test_skipping_older_events(self, unique_database):
+    """Test is to verify IMPALA-11535, event processor should ignore older events if the
+    current event id is older than the lastRefreshEventId on the table/partition
+    """
+    test_old_table = "test_old_table"
+
+    def verify_skipping_older_events(table_name, is_transactional, is_partitioned):
+      query = " ".join(["create", "transactional" if is_transactional else '',
+        "table {}.{} (i int)", "partitioned by (year int)" if is_partitioned else ''])
+      self.run_stmt_in_hive(query.format(unique_database, table_name))
+      values = "values (10),(20),(30)"
+      EventProcessorUtils.wait_for_event_processing(self)
+
+      def verify_skipping_hive_stmt_events(stmt, new_table_name):
+        tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+        self.run_stmt_in_hive(stmt)
+        self.client.execute(
+          "refresh {}.{}".format(unique_database, new_table_name))
+        tables_refreshed_before = EventProcessorUtils.get_int_metric("tables-refreshed")
+        partitions_refreshed_before = \
+          EventProcessorUtils.get_int_metric("partitions-refreshed")
+        EventProcessorUtils.wait_for_event_processing(self)
+        tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+        assert tbl_events_skipped_after > tbl_events_skipped_before
+        tables_refreshed_after = EventProcessorUtils.get_int_metric("tables-refreshed")
+        partitions_refreshed_after = \
+          EventProcessorUtils.get_int_metric("partitions-refreshed")
+        if is_partitioned:
+          assert partitions_refreshed_after == partitions_refreshed_before
+        else:
+          assert tables_refreshed_after == tables_refreshed_before
+
+      # test single insert event
+      query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
+        if is_partitioned else '', values])
+      verify_skipping_hive_stmt_events(
+        query.format(unique_database, table_name), table_name)
+      # test batch insert events
+      query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
+        if is_partitioned else '', values, ";"])
+      complete_query = ""
+      for _ in range(3):
+        complete_query += query.format(unique_database, table_name)
+      verify_skipping_hive_stmt_events(complete_query, table_name)
+      # Dynamic partitions test
+      query = " ".join(["create", "transactional" if is_transactional else '',
+        "table `{}`.`{}` (i int)",
+        "partitioned by (year int)" if is_partitioned else '', ";"])
+      complete_query = query.format(unique_database, "new_table")
+      complete_query += "insert overwrite table `{db}`.`{tbl1}` " \
+        "select * from `{db}`.`{tbl2}`"\
+        .format(db=unique_database, tbl1="new_table", tbl2=table_name)
+      verify_skipping_hive_stmt_events(complete_query, "new_table")
+      # Drop the tables before running another test
+      self.client.execute("drop table {}.{}".format(unique_database, table_name))
+      self.client.execute("drop table {}.{}".format(unique_database, "new_table"))
+    verify_skipping_older_events(test_old_table, False, False)
+    verify_skipping_older_events(test_old_table, True, False)
+    verify_skipping_older_events(test_old_table, False, True)
+    verify_skipping_older_events(test_old_table, True, True)
 
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_commit_compaction_events(self, unique_database):