You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/30 05:24:40 UTC

[impala] 01/02: IMPALA-12024: Add catalog profile for createTable

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 27d37a60c954d7734cfaea35e73939d5c7681d55
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Aug 16 10:59:47 2023 +0800

    IMPALA-12024: Add catalog profile for createTable
    
    DDL/DMLs might spend a long time in the execution in catalogd.
    Currently, the profiles just have a counter of CatalogOpExecTimer. We
    need more items to show the details of how they are executed in
    catalogd.
    
    As the first step, this patch adds the profile timeline for how the
    createTable DDL is executed in Catalog Server. Also added more events in
    the existing "Query Timeline" for when the CatalogDdlRequest finished
    and when the catalog updates are applied.
    
    To implement this, a 'profile' field is added in TDdlExecResponse to
    carry the execution counters and timeline in catalogd. Currently, we
    just uses the timeline of it. We can add more counters in the future.
    Several methods add a timeline parameter to mark the progress in them.
    Timeline events are added after each RPC finished.
    
    Here is an example when HMS is hanging for 26s in a CTAS. I used gdb to
    attach to HMS as the JIRA description mentioned. In the timeline, we can
    see the time is spent in the first HMS RPC that fetching the current HMS
    event id:
        Catalog Server Operation: 26s560ms
           - Got metastoreDdlLock: 163.977us (163.977us)
           - Got Metastore client: 166.339us (2.362us)
           - Got current Metastore event id 8355270: 26s494ms (26s494ms)
           - Created table in Metastore: 26s558ms (63.507ms)
           - Fetched event batch from Metastore: 26s559ms (1.155ms)
           - Created table in catalog cache: 26s560ms (1.164ms)
           - DDL finished: 26s560ms (65.538us)
        Query Compilation: 164.257ms
           - Metadata of all 1 tables cached: 10.535ms (10.535ms)
           - Analysis finished: 118.324ms (107.788ms)
           - Authorization finished (noop): 118.489ms (164.626us)
           - Value transfer graph computed: 118.830ms (341.792us)
           - Single node plan created: 150.150ms (31.319ms)
           - Runtime filters computed: 150.254ms (103.529us)
           - Distributed plan created: 151.832ms (1.578ms)
           - Planning finished: 164.257ms (12.425ms)
        Query Timeline: 27s304ms
           - Query submitted: 129.658us (129.658us)
           - Planning finished: 170.224ms (170.095ms)
           - CatalogDdlRequest finished: 26s731ms (26s561ms)
           - Applied catalog updates from DDL: 26s740ms (8.752ms)
           - Submit for admission: 26s740ms (22.233us)
           - Completed admission: 26s740ms (286.295us)
           - Ready to start on 3 backends: 26s740ms (155.916us)
           - All 3 execution backends (3 fragment instances) started: 26s751ms (10.864ms)
           - Last row fetched: 26s920ms (168.226ms)
           - Released admission control resources: 26s920ms (27.635us)
           - DML data written: 26s920ms (126.369us)
           - Applied catalog updates from DDL: 26s985ms (65.354ms)
           - DML Metastore update finished: 26s985ms (30.343us)
           - Rows available: 26s985ms (27.050us)
           - Unregister query: 27s304ms (318.661ms)
    
    An example of creating a Kudu table:
        Catalog Server Operation: 1s730ms
           - Got Metastore client: 113.403us (113.403us)
           - Got current Metastore event id 8355276: 974.500us (861.097us)
           - Got Kudu client: 212.123ms (211.148ms)
           - Got kuduDdlLock: 212.128ms (4.680us)
           - Checked table existence in Kudu: 850.786ms (638.658ms)
           - Created table in Kudu: 1s623ms (772.379ms)
           - Got metastoreDdlLock: 1s623ms (397.305us)
           - Got Metastore client: 1s623ms (7.813us)
           - Checked table existence in Metastore: 1s648ms (25.154ms)
           - Created table in Metastore: 1s725ms (76.348ms)
           - Fetched event batch from Metastore: 1s728ms (3.004ms)
           - Created table in catalog cache: 1s730ms (2.024ms)
           - DDL finished: 1s730ms (84.448us)
    
    An example of creating an Iceberg table:
        Catalog Server Operation: 1s573ms
           - Got Metastore client: 141.799us (141.799us)
           - Checked table existence in Metastore: 2.957ms (2.815ms)
           - Got current Metastore event id 8355277: 3.669ms (712.475us)
           - Created table using Iceberg Catalog HIVE_CATALOG: 1s379ms (1s375ms)
           - Fetched event batch from Metastore: 1s381ms (2.188ms)
           - Created table in catalog cache: 1s382ms (1.556ms)
           - Set Iceberg table owner in Metastore: 1s573ms (190.262ms)
           - DDL finished: 1s573ms (59.176us)
    
    Tests:
     - Add e2e tests to verify the DDL timeline events exist in profile
    
    Change-Id: I3ebf591625e71391a5b23f56ddca8f0ae97b1efa
    Reviewed-on: http://gerrit.cloudera.org:8080/20368
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |  17 +-
 be/src/service/impala-server.cc                    |  29 +++-
 be/src/service/impala-server.h                     |  10 +-
 common/thrift/CatalogService.thrift                |   4 +
 .../java/org/apache/impala/catalog/Catalog.java    |  10 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 186 +++++++++++++++------
 .../java/org/apache/impala/service/Frontend.java   |   2 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  67 ++++++--
 tests/query_test/test_observability.py             |  60 +++++++
 9 files changed, 306 insertions(+), 79 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 0f9764201..0d48c921e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -705,11 +705,20 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   DebugActionNoFail(exec_request_->query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
 
   Status status = catalog_op_executor_->Exec(exec_request_->catalog_op_request);
+  query_events_->MarkEvent("CatalogDdlRequest finished");
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
   }
 
+  if (catalog_op_executor_->ddl_exec_response() != nullptr &&
+      catalog_op_executor_->ddl_exec_response()->__isset.profile) {
+    for (const TEventSequence& catalog_timeline :
+        catalog_op_executor_->ddl_exec_response()->profile.event_sequences) {
+      summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline);
+    }
+  }
+
   // If this is a CTAS request, there will usually be more work to do
   // after executing the CREATE TABLE statement (the INSERT portion of the operation).
   // The exception is if the user specified IF NOT EXISTS and the table already
@@ -725,7 +734,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   // Add newly created table to catalog cache.
   status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options());
+      exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -842,7 +851,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   status = parent_server_->ProcessCatalogUpdateResult(
       resp.result,
-      exec_request_->query_options.sync_ddl, query_options());
+      exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -1597,7 +1606,7 @@ Status ClientRequestState::UpdateCatalog() {
         query_events_->MarkEvent("Transaction committed");
       }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_->query_options.sync_ddl, query_options()));
+          exec_request_->query_options.sync_ddl, query_options(), query_events_));
     }
   } else if (InKuduTransaction()) {
     // Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1757,7 +1766,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
   RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options()));
+      exec_request_->query_options.sync_ddl, query_options(), query_events_));
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d054a5413..1a6863fd7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2192,7 +2192,7 @@ void ImpalaServer::CatalogUpdateCallback(
 }
 
 void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   // Wait for the update to be processed locally.
   VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
@@ -2203,14 +2203,17 @@ void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Received catalog version $0",
+        catalog_update_version));
     VLOG_QUERY << "Received catalog version: " << catalog_update_version;
   }
 }
 
 void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   int64_t min_req_subscriber_topic_version =
       catalog_update_info_.catalog_topic_version;
@@ -2223,15 +2226,18 @@ void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Received min subscriber topic version $0",
+        min_req_subscriber_topic_version));
     VLOG_QUERY << "Received min subscriber topic version: "
         << min_req_subscriber_topic_version;
   }
 }
 
 void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_version,
-    const TUniqueId& catalog_service_id) {
+    const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   int64_t catalog_object_version_lower_bound =
       catalog_update_info_.catalog_object_version_lower_bound;
@@ -2247,8 +2253,11 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    timeline->MarkEvent("Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
+    timeline->MarkEvent(Substitute("Local min catalog version reached $0",
+        min_req_catalog_object_version));
     VLOG_QUERY << "Updated catalog object version lower bound: "
         << min_req_catalog_object_version;
   }
@@ -2256,7 +2265,7 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
 
 Status ImpalaServer::ProcessCatalogUpdateResult(
     const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
-    const TQueryOptions& query_options) {
+    const TQueryOptions& query_options, RuntimeProfile::EventSequence* timeline) {
   const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
   if (!catalog_update_result.__isset.updated_catalog_objects &&
       !catalog_update_result.__isset.removed_catalog_objects) {
@@ -2264,15 +2273,16 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     // 'catalog_update_result' to determine when the effects of this operation
     // have been applied to the local catalog cache.
     if (catalog_update_result.is_invalidate) {
-      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id);
+      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id,
+          timeline);
     } else {
-      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
     }
     if (wait_for_all_subscribers) {
       // Now wait for this update to be propagated to all catalog topic subscribers.
       // If we make it here it implies the first condition was met (the update was
       // processed locally or the catalog service id has changed).
-      WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+      WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
     }
   } else {
     TUniqueId cur_service_id;
@@ -2304,6 +2314,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       // Apply the changes to the local catalog cache.
       TUpdateCatalogCacheResponse resp;
       Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
+      timeline->MarkEvent("Applied catalog updates from DDL");
       if (!status.ok()) LOG(ERROR) << status.GetDetail();
       RETURN_IF_ERROR(status);
     } else {
@@ -2336,10 +2347,10 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the effects
     // (catalog objects) of this operation.
-    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
     // Now wait for this update to be propagated to all catalog topic
     // subscribers.
-    WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+    WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
   }
   return Status::OK();
 }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index cc3ff88c6..61f593a7c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -423,24 +423,26 @@ class ImpalaServer : public ImpalaServiceIf,
   ///
   /// 'query_options' is used for running debug actions.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
-      bool wait_for_all_subscribers, const TQueryOptions& query_options)
+      bool wait_for_all_subscribers, const TQueryOptions& query_options,
+      RuntimeProfile::EventSequence* timeline)
       WARN_UNUSED_RESULT;
 
   /// Wait until the catalog update with version 'catalog_update_version' is
   /// received and applied in the local catalog cache or until the catalog
   /// service id has changed.
   void WaitForCatalogUpdate(const int64_t catalog_update_version,
-      const TUniqueId& catalog_service_id);
+      const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline);
 
   /// Wait until the minimum catalog object version in the local cache is
   /// greater than 'min_catalog_update_version' or until the catalog
   /// service id has changed.
   void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
-      const TUniqueId& catalog_service_id);
+      const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline);
 
   /// Wait until the last applied catalog update has been broadcast to
   /// all coordinators or until the catalog service id has changed.
-  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);
+  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id,
+      RuntimeProfile::EventSequence* timeline);
 
   /// Returns true if lineage logging is enabled, false otherwise.
   ///
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index bb3482cec..f309591f2 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -23,6 +23,7 @@ include "JniCatalog.thrift"
 include "Types.thrift"
 include "Status.thrift"
 include "Results.thrift"
+include "RuntimeProfile.thrift"
 include "hive_metastore.thrift"
 include "SqlConstraints.thrift"
 
@@ -209,6 +210,9 @@ struct TDdlExecResponse {
   // created table. This is useful for establishing lineage between table and it's
   // location for external tables.
   6: optional string table_location
+
+  // Profile of the DDL execution in catalogd
+  7: optional RuntimeProfile.TRuntimeProfileNode profile
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 62c3c92ae..78aae22d1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -49,6 +49,7 @@ import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.PatternMatcher;
 
 import com.google.common.base.Joiner;
@@ -396,6 +397,15 @@ public abstract class Catalog implements AutoCloseable {
    */
   public MetaStoreClient getMetaStoreClient() { return metaStoreClientPool_.getClient(); }
 
+  /**
+   * Same as the above but also update the given 'timeline'.
+   */
+  public MetaStoreClient getMetaStoreClient(EventSequence timeline) {
+    MetaStoreClient client = getMetaStoreClient();
+    if (timeline != null) timeline.markEvent("Got Metastore client");
+    return client;
+  }
+
   /**
    * Return all members of 'candidates' that match 'matcher'.
    * The results are sorted in String.CASE_INSENSITIVE_ORDER.
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f569dfbd8..13d5e4161 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -228,6 +228,7 @@ import org.apache.impala.thrift.TResetMetadataResponse;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
@@ -245,6 +246,7 @@ import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.DebugUtils;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
@@ -366,6 +368,27 @@ public class CatalogOpExecutor {
   // Table capabilities property name
   public static final String CAPABILITIES_KEY = "OBJCAPABILITIES";
 
+  // Labels used in catalog timelines
+  private static final String CHECKED_HMS_TABLE_EXISTENCE =
+      "Checked table existence in Metastore";
+  private static final String CREATED_HMS_TABLE = "Created table in Metastore";
+  private static final String CREATED_CATALOG_TABLE = "Created table in catalog cache";
+  private static final String CREATED_ICEBERG_TABLE =
+      "Created table using Iceberg Catalog ";
+  private static final String DDL_FINISHED = "DDL finished";
+  private static final String FETCHED_LATEST_HMS_EVENT_ID =
+      "Got current Metastore event id ";
+  private static final String FETCHED_HMS_EVENT_BATCH =
+      "Fetched event batch from Metastore";
+  private static final String FETCHED_HMS_TABLE = "Fetched table from Metastore";
+  private static final String GOT_METASTORE_DDL_LOCK = "Got metastoreDdlLock";
+  private static final String GOT_TABLE_WRITE_LOCK = "Got table write lock";
+  private static final String LOADED_ICEBERG_TABLE = "Loaded iceberg table";
+  private static final String SENT_CATALOG_FOR_SYNC_DDL =
+      "Sent catalog update for sync_ddl";
+  private static final String SET_ICEBERG_TABLE_OWNER =
+      "Set Iceberg table owner in Metastore";
+
   private final CatalogServiceCatalog catalog_;
   private final AuthorizationConfig authzConfig_;
   private final AuthorizationManager authzManager_;
@@ -395,6 +418,7 @@ public class CatalogOpExecutor {
 
   public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
       throws ImpalaException {
+    EventSequence catalogTimeline = new EventSequence("Catalog Server Operation");
     TDdlExecResponse response = new TDdlExecResponse();
     response.setResult(new TCatalogUpdateResult());
     response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
@@ -443,28 +467,29 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(create_table_as_select_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
           response.setNew_table_created(createTable(create_table_as_select_params,
-              response, syncDdl, wantMinimalResult));
+              response, catalogTimeline, syncDdl, wantMinimalResult));
           break;
         case CREATE_TABLE:
           TCreateTableParams create_table_params = ddlRequest.getCreate_table_params();
           tTableName = Optional.of((create_table_params.getTable_name()));
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTable(ddlRequest.getCreate_table_params(), response, syncDdl,
-              wantMinimalResult);
+          createTable(ddlRequest.getCreate_table_params(), response, catalogTimeline,
+              syncDdl, wantMinimalResult);
           break;
         case CREATE_TABLE_LIKE:
           TCreateTableLikeParams create_table_like_params =
               ddlRequest.getCreate_table_like_params();
           tTableName = Optional.of(create_table_like_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTableLike(create_table_like_params, response, syncDdl, wantMinimalResult);
+          createTableLike(create_table_like_params, response, catalogTimeline, syncDdl,
+              wantMinimalResult);
           break;
         case CREATE_VIEW:
           TCreateOrAlterViewParams create_view_params =
               ddlRequest.getCreate_view_params();
           tTableName = Optional.of(create_view_params.getView_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createView(create_view_params, wantMinimalResult, response);
+          createView(create_view_params, wantMinimalResult, response, catalogTimeline);
           break;
         case CREATE_FUNCTION:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -559,6 +584,7 @@ public class CatalogOpExecutor {
           throw new IllegalStateException(
               "Unexpected DDL exec request type: " + ddl_type);
       }
+      catalogTimeline.markEvent(DDL_FINISHED);
 
       // If SYNC_DDL is set, set the catalog update that contains the results of this DDL
       // operation. The version of this catalog update is returned to the requesting
@@ -567,8 +593,12 @@ public class CatalogOpExecutor {
       if (syncDdl) {
         response.getResult().setVersion(
             catalog_.waitForSyncDdlVersion(response.getResult()));
+        catalogTimeline.markEvent(SENT_CATALOG_FOR_SYNC_DDL);
       }
 
+      TRuntimeProfileNode profile = Frontend.createTRuntimeProfileNode("CatalogOp");
+      profile.addToEvent_sequences(catalogTimeline.toThrift());
+      response.setProfile(profile);
       // At this point, the operation is considered successful. If any errors occurred
       // during execution, this function will throw an exception and the CatalogServer
       // will handle setting a bad status code.
@@ -2054,9 +2084,22 @@ public class CatalogOpExecutor {
    */
   private List<NotificationEvent> getNextMetastoreEventsIfEnabled(long eventId,
       NotificationFilter eventsFilter) throws MetastoreNotificationException {
+    return getNextMetastoreEventsIfEnabled(null, eventId, eventsFilter);
+  }
+
+  /**
+   * Same as the above but also update the given 'catalogTimeline'.
+   */
+  private List<NotificationEvent> getNextMetastoreEventsIfEnabled(
+      EventSequence catalogTimeline, long eventId, NotificationFilter eventsFilter)
+      throws MetastoreNotificationException {
     if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
-    return MetastoreEventsProcessor
+    List<NotificationEvent> events = MetastoreEventsProcessor
         .getNextMetastoreEventsInBatches(catalog_, eventId, eventsFilter);
+    if (catalogTimeline != null) {
+      catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
+    }
+    return events;
   }
 
   /**
@@ -2230,8 +2273,20 @@ public class CatalogOpExecutor {
    * Returns the latest notification event id from the Hive metastore.
    */
   private long getCurrentEventId(MetaStoreClient msClient) throws ImpalaRuntimeException {
+    return getCurrentEventId(msClient, null);
+  }
+
+  /**
+   * Same as the above but also update the given 'catalogTimeline'.
+   */
+  private long getCurrentEventId(MetaStoreClient msClient, EventSequence catalogTimeline)
+      throws ImpalaRuntimeException {
     try {
-      return msClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+      long id = msClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+      if (catalogTimeline != null) {
+        catalogTimeline.markEvent(FETCHED_LATEST_HMS_EVENT_ID + id);
+      }
+      return id;
     } catch (TException e) {
       throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
           "getCurrentNotificationEventId") + e
@@ -3290,7 +3345,8 @@ public class CatalogOpExecutor {
    * otherwise.
    */
   private boolean createTable(TCreateTableParams params, TDdlExecResponse response,
-      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
@@ -3306,6 +3362,7 @@ public class CatalogOpExecutor {
       LOG.trace("Skipping table creation because {} already exists and " +
           "IF NOT EXISTS was specified.", tableName);
       tryWriteLock(existingTbl);
+      catalogTimeline.markEvent(GOT_TABLE_WRITE_LOCK);
       try {
         if (syncDdl) {
           // When SYNC_DDL is enabled and the table already exists, we force a version
@@ -3334,18 +3391,18 @@ public class CatalogOpExecutor {
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.trace("Creating table {}", tableName);
     if (KuduTable.isKuduTable(tbl)) {
-      return createKuduTable(tbl, params, wantMinimalResult, response);
+      return createKuduTable(tbl, params, wantMinimalResult, response, catalogTimeline);
     } else if (IcebergTable.isIcebergTable(tbl)) {
-      return createIcebergTable(tbl, wantMinimalResult, response, params.if_not_exists,
-          params.getColumns(), params.getPartition_spec(), params.getTable_properties(),
-          params.getComment());
+      return createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
+          params.if_not_exists, params.getColumns(), params.getPartition_spec(),
+          params.getTable_properties(), params.getComment());
     }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
     MetastoreShim.setTableLocation(catalog_.getDb(tbl.getDbName()), tbl);
     return createTable(tbl, params.if_not_exists, params.getCache_op(),
         params.server_name, params.getPrimary_keys(), params.getForeign_keys(),
-        wantMinimalResult, response);
+        wantMinimalResult, response, catalogTimeline);
   }
 
   /**
@@ -3453,42 +3510,44 @@ public class CatalogOpExecutor {
    * table was created as part of this call, false otherwise.
    */
   private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse response)
-      throws ImpalaException {
+      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse response,
+      EventSequence catalogTimeline) throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
     boolean createHMSTable;
     long eventId;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      eventId = getCurrentEventId(msClient);
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
+      eventId = getCurrentEventId(msClient, catalogTimeline);
     }
     if (!KuduTable.isSynchronizedTable(newTable)) {
       // if this is not a synchronized table, we assume that the table must be existing
       // in kudu and use the column spec from Kudu
-      KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
+      KuduCatalogOpExecutor.populateExternalTableColsFromKudu(catalogTimeline, newTable);
       createHMSTable = true;
     } else {
       // if this is a synchronized table (managed or external.purge table) then we
       // create it in Kudu first
-      KuduCatalogOpExecutor.createSynchronizedTable(newTable, params);
+      KuduCatalogOpExecutor.createSynchronizedTable(catalogTimeline, newTable, params);
       createHMSTable = !isKuduHmsIntegrationEnabled(newTable);
     }
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
-      List<NotificationEvent> events = Collections.EMPTY_LIST;
-      getMetastoreDdlLock().lock();
+      List<NotificationEvent> events = Collections.emptyList();
+      acquireMetastoreDdlLock(catalogTimeline);
       if (createHMSTable) {
-        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
           boolean tableInMetastore =
               msClient.getHiveClient().tableExists(newTable.getDbName(),
                                                    newTable.getTableName());
+          catalogTimeline.markEvent(CHECKED_HMS_TABLE_EXISTENCE);
           if (!tableInMetastore) {
             msClient.getHiveClient().createTable(newTable);
+            catalogTimeline.markEvent(CREATED_HMS_TABLE);
           } else {
             addSummary(response, "Table already exists.");
             return false;
           }
-          events = getNextMetastoreEventsIfEnabled(eventId,
+          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId,
                   event -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
                       .equals(event.getEventType())
                       && newTable.getDbName().equalsIgnoreCase(event.getDbName())
@@ -3505,12 +3564,13 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTable =
           eventTblPair == null ? null : eventTblPair.second;
       setTableNameAndCreateTimeInResponse(msTable,
-          newTable.getDbName(), newTable.getTableName(), response);
+          newTable.getDbName(), newTable.getTableName(), response, catalogTimeline);
 
       // Add the table to the catalog cache
       Table newTbl = catalog_
           .addIncompleteTable(newTable.getDbName(), newTable.getTableName(),
               TImpalaTableType.TABLE, params.getComment(), createEventId);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created a Kudu table {} with create event id {}", newTbl.getFullName(),
           createEventId);
       addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
@@ -3551,26 +3611,28 @@ public class CatalogOpExecutor {
   private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
-      boolean wantMinimalResult, TDdlExecResponse response) throws ImpalaException {
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline)
+      throws ImpalaException {
     Preconditions.checkState(!KuduTable.isKuduTable(newTable));
-    getMetastoreDdlLock().lock();
+    acquireMetastoreDdlLock(catalogTimeline);
     try {
       org.apache.hadoop.hive.metastore.api.Table msTable;
       Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventIdTblPair = null;
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        long eventId = getCurrentEventId(msClient);
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
+        long eventId = getCurrentEventId(msClient, catalogTimeline);
         if (primaryKeys == null && foreignKeys == null) {
           msClient.getHiveClient().createTable(newTable);
         } else {
           MetastoreShim.createTableWithConstraints(
               msClient.getHiveClient(), newTable,
-              primaryKeys == null ? new ArrayList<>() : primaryKeys,
-              foreignKeys == null ? new ArrayList<>() : foreignKeys);
+              primaryKeys == null ? Collections.emptyList() : primaryKeys,
+              foreignKeys == null ? Collections.emptyList() : foreignKeys);
         }
-
+        catalogTimeline.markEvent(CREATED_HMS_TABLE);
         addSummary(response, "Table has been created.");
         final org.apache.hadoop.hive.metastore.api.Table finalNewTable = newTable;
-        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(eventId,
+        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
+            catalogTimeline, eventId,
             notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
                 .equals(notificationEvent.getEventType())
                 && finalNewTable.getDbName()
@@ -3583,10 +3645,11 @@ public class CatalogOpExecutor {
           // not atomic.
           eventIdTblPair = new Pair<>(-1L, msClient.getHiveClient()
               .getTable(newTable.getDbName(), newTable.getTableName()));
+          catalogTimeline.markEvent(FETCHED_HMS_TABLE);
         }
         msTable = eventIdTblPair.second;
         setTableNameAndCreateTimeInResponse(msTable, newTable.getDbName(),
-            newTable.getTableName(), response);
+            newTable.getTableName(), response, catalogTimeline);
         // For external tables set table location needed for lineage generation.
         if (newTable.getTableType() == TableType.EXTERNAL_TABLE.toString()) {
           String tableLocation = newTable.getSd().getLocation();
@@ -3611,6 +3674,7 @@ public class CatalogOpExecutor {
           MetadataOp.getTableComment(msTable),
           eventIdTblPair.first);
       Preconditions.checkNotNull(newTbl);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created catalog table {} with create event id {}", newTbl.getFullName(),
           eventIdTblPair.first);
       // Submit the cache request and update the table metadata.
@@ -3651,10 +3715,12 @@ public class CatalogOpExecutor {
    */
   private void setTableNameAndCreateTimeInResponse(
       org.apache.hadoop.hive.metastore.api.Table msTable, String dbName, String tblName,
-      TDdlExecResponse response) throws org.apache.thrift.TException {
+      TDdlExecResponse response, EventSequence catalogTimeline)
+      throws org.apache.thrift.TException {
     if (msTable == null) {
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
         msTable = msClient.getHiveClient().getTable(dbName, tblName);
+        catalogTimeline.markEvent(FETCHED_HMS_TABLE + " to get create time");
       }
     }
     response.setTable_name(dbName + "." + tblName);
@@ -3667,9 +3733,9 @@ public class CatalogOpExecutor {
    * exceptions encountered during the create.
    */
   private void createView(TCreateOrAlterViewParams params, boolean wantMinimalResult,
-      TDdlExecResponse response) throws ImpalaException {
+      TDdlExecResponse response, EventSequence catalogTimeline) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getView_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
+    Preconditions.checkState(tableName.isFullyQualified());
     Preconditions.checkState(params.getColumns() != null &&
         params.getColumns().size() > 0,
           "Null or empty column list given as argument to DdlExecutor.createView");
@@ -3687,8 +3753,9 @@ public class CatalogOpExecutor {
         new org.apache.hadoop.hive.metastore.api.Table();
     setCreateViewAttributes(params, view);
     LOG.trace(String.format("Creating view %s", tableName));
-    if (!createTable(view, params.if_not_exists, null, params.server_name,
-        new ArrayList<>(), new ArrayList<>(), wantMinimalResult, response)) {
+    if (!createTable(view, params.if_not_exists, /*cacheOp*/null, params.server_name,
+        /*primaryKeys*/null, /*foreignKeys*/null, wantMinimalResult, response,
+        catalogTimeline)) {
       addSummary(response, "View already exists.");
     } else {
       addSummary(response, "View has been created.");
@@ -3699,8 +3766,8 @@ public class CatalogOpExecutor {
    * Creates a new Iceberg table.
    */
   private boolean createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      boolean wantMinimalResult, TDdlExecResponse response, boolean ifNotExists,
-      List<TColumn> columns, TIcebergPartitionSpec partitionSpec,
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline,
+      boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec partitionSpec,
       Map<String, String> tableProperties, String tblComment) throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
@@ -3708,13 +3775,14 @@ public class CatalogOpExecutor {
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
-      List<NotificationEvent> events = Collections.emptyList();
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      List<NotificationEvent> events;
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
         boolean tableInMetastore =
             msClient.getHiveClient().tableExists(newTable.getDbName(),
                 newTable.getTableName());
+        catalogTimeline.markEvent(CHECKED_HMS_TABLE_EXISTENCE);
         if (!tableInMetastore) {
-          long eventId = getCurrentEventId(msClient);
+          long eventId = getCurrentEventId(msClient, catalogTimeline);
           TIcebergCatalog catalog = IcebergUtil.getTIcebergCatalog(newTable);
           String location = newTable.getSd().getLocation();
           //Create table in iceberg if necessary
@@ -3737,6 +3805,7 @@ public class CatalogOpExecutor {
                 IcebergUtil.getIcebergTableIdentifier(newTable), location, columns,
                 partitionSpec, tableProperties).location();
             newTable.getSd().setLocation(tableLoc);
+            catalogTimeline.markEvent(CREATED_ICEBERG_TABLE + catalog.name());
           } else {
             // If this is not a synchronized table, we assume that the table must be
             // existing in an Iceberg Catalog.
@@ -3758,6 +3827,7 @@ public class CatalogOpExecutor {
             TableIdentifier identifier = IcebergUtil.getIcebergTableIdentifier(newTable);
             org.apache.iceberg.Table iceTable = IcebergUtil.loadTable(
                 catalog, identifier, locationToLoadFrom, newTable.getParameters());
+            catalogTimeline.markEvent(LOADED_ICEBERG_TABLE);
             // Populate the HMS table schema based on the Iceberg table's schema because
             // the Iceberg metadata is the source of truth. This also avoids an
             // unnecessary ALTER TABLE.
@@ -3776,8 +3846,9 @@ public class CatalogOpExecutor {
               newTable.getPartitionKeys().isEmpty());
           if (!isIcebergHmsIntegrationEnabled(newTable)) {
             msClient.getHiveClient().createTable(newTable);
+            catalogTimeline.markEvent(CREATED_HMS_TABLE);
           }
-          events = getNextMetastoreEventsIfEnabled(eventId, event ->
+          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, event ->
               CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType())
                   && newTable.getDbName().equalsIgnoreCase(event.getDbName())
                   && newTable.getTableName().equalsIgnoreCase(event.getTableName()));
@@ -3792,11 +3863,12 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTable =
           eventTblPair == null ? null : eventTblPair.second;
       setTableNameAndCreateTimeInResponse(msTable,
-          newTable.getDbName(), newTable.getTableName(), response);
+          newTable.getDbName(), newTable.getTableName(), response, catalogTimeline);
       // Add the table to the catalog cache
       Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
           newTable.getTableName(), TImpalaTableType.TABLE, tblComment,
           createEventId);
+      catalogTimeline.markEvent(CREATED_CATALOG_TABLE);
       LOG.debug("Created an iceberg table {} in catalog with create event Id {} ",
           newTbl.getFullName(), createEventId);
       addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
@@ -3808,6 +3880,7 @@ public class CatalogOpExecutor {
         // extra "ALTER TABLE SET OWNER" step is required.
         setIcebergTableOwnerAfterCreateTable(newTable.getDbName(),
             newTable.getTableName(), newTable.getOwner());
+        catalogTimeline.markEvent(SET_ICEBERG_TABLE_OWNER);
         LOG.debug("Table owner has been changed to " + newTable.getOwner());
       } catch (Exception e) {
         LOG.warn("Failed to set table owner after creating " +
@@ -3852,7 +3925,8 @@ public class CatalogOpExecutor {
    * @param  syncDdl tells is SYNC_DDL is enabled for this DDL request.
    */
   private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response,
-      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkNotNull(params);
     THdfsFileFormat fileFormat =
         params.isSetFile_format() ? params.getFile_format() : null;
@@ -3985,16 +4059,18 @@ public class CatalogOpExecutor {
       for (Column col: srcIceTable.getColumns()) columns.add(col.toThrift());
       TIcebergPartitionSpec partitionSpec = srcIceTable.getDefaultPartitionSpec()
           .toThrift();
-      createIcebergTable(tbl, wantMinimalResult, response, params.if_not_exists, columns,
-          partitionSpec, tableProperties, params.getComment());
+      createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
+          params.if_not_exists, columns, partitionSpec, tableProperties,
+          params.getComment());
     } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) {
       TCreateTableParams createTableParams =
           extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, tbl);
-      createKuduTable(tbl, createTableParams, wantMinimalResult, response);
+      createKuduTable(tbl, createTableParams, wantMinimalResult, response,
+          catalogTimeline);
     } else {
       MetastoreShim.setTableLocation(catalog_.getDb(tbl.getDbName()), tbl);
       createTable(tbl, params.if_not_exists, null, params.server_name, null, null,
-          wantMinimalResult, response);
+          wantMinimalResult, response, catalogTimeline);
     }
   }
 
@@ -4870,6 +4946,14 @@ public class CatalogOpExecutor {
     return metastoreDdlLock_;
   }
 
+  /**
+   * Acquires 'metastoreDdlLock_' and also updates 'catalogTimeline' when it's done.
+   */
+  private void acquireMetastoreDdlLock(EventSequence catalogTimeline) {
+    metastoreDdlLock_.lock();
+    catalogTimeline.markEvent(GOT_METASTORE_DDL_LOCK);
+  }
+
   /**
    * Adds partitions in 'allHmsPartitionsToAdd' in batches via 'msClient'.
    * Returns the created partitions.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b2124a1a4..794a65a79 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2287,7 +2287,7 @@ public class Frontend {
     }
   }
 
-  private static TRuntimeProfileNode createTRuntimeProfileNode(
+  public static TRuntimeProfileNode createTRuntimeProfileNode(
       String childrenProfileName) {
     return new TRuntimeProfileNode(childrenProfileName,
         /*num_children=*/0,
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 901bf3da8..738e21fbf 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -41,6 +41,7 @@ import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TKuduPartitionByHashParam;
 import org.apache.impala.thrift.TRangePartition;
 import org.apache.impala.thrift.TRangePartitionOperationType;
+import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
@@ -70,16 +71,59 @@ import com.google.common.collect.Sets;
 public class KuduCatalogOpExecutor {
   public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
 
+  // Labels used in catalog timeline
+  private static final String CHECKED_KUDU_TABLE_EXISTENCE =
+      "Checked table existence in Kudu";
+  private static final String CREATED_KUDU_TABLE = "Created table in Kudu";
+  private static final String GOT_KUDU_CLIENT = "Got Kudu client";
+  private static final String GOT_KUDU_DDL_LOCK = "Got kuduDdlLock";
+  private static final String OPENED_KUDU_TABLE = "Opened Kudu table";
+  private static final String POPULATED_COLS_FROM_KUDU =
+      "Populated external table cols from Kudu";
+
   private static final Object kuduDdlLock_ = new Object();
 
+  /**
+   * Wrapper to get kudu client and mark the given 'catalogTimeline' when it finishes.
+   */
+  private static KuduClient getKuduClient(String masterHosts,
+      EventSequence catalogTimeline) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    catalogTimeline.markEvent(GOT_KUDU_CLIENT);
+    return kudu;
+  }
+
+  /**
+   * Wrapper to check kudu table existence and mark the given 'catalogTimeline' when it
+   * finishes.
+   */
+  private static boolean checkTableExistence(KuduClient client, String kuduTableName,
+      EventSequence catalogTimeline) throws KuduException {
+    boolean tableExists = client.tableExists(kuduTableName);
+    catalogTimeline.markEvent(CHECKED_KUDU_TABLE_EXISTENCE);
+    return tableExists;
+  }
+
+  /**
+   * Wrapper to create the kudu table and mark the given 'catalogTimeline' when it
+   * finishes.
+   */
+  private static org.apache.kudu.client.KuduTable createKuduTable(KuduClient client,
+      String name, Schema schema, CreateTableOptions tableOpts,
+      EventSequence catalogTimeline) throws KuduException {
+    org.apache.kudu.client.KuduTable table = client.createTable(name, schema, tableOpts);
+    catalogTimeline.markEvent(CREATED_KUDU_TABLE);
+    return table;
+  }
+
   /**
    * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
    * be created in Kudu.
    */
-  public static void createSynchronizedTable(
-          org.apache.hadoop.hive.metastore.api.Table msTbl,
-          TCreateTableParams params) throws ImpalaRuntimeException {
+  public static void createSynchronizedTable(EventSequence catalogTimeline,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TCreateTableParams params) throws ImpalaRuntimeException {
     Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
     Preconditions.checkState(
         msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
@@ -89,13 +133,14 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName,
           masterHosts));
     }
-    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    KuduClient kudu = getKuduClient(masterHosts, catalogTimeline);
     try {
       // Acquire lock to protect table existence check and table creation, see IMPALA-8984
       synchronized (kuduDdlLock_) {
+        catalogTimeline.markEvent(GOT_KUDU_DDL_LOCK);
         // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
         // (see KUDU-1710).
-        boolean tableExists = kudu.tableExists(kuduTableName);
+        boolean tableExists = checkTableExistence(kudu, kuduTableName, catalogTimeline);
         if (tableExists && params.if_not_exists) return;
 
         // if table is managed or external with external.purge.table = true in
@@ -107,8 +152,8 @@ public class KuduCatalogOpExecutor {
         Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
         Schema schema = createTableSchema(params);
         CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
-        org.apache.kudu.client.KuduTable table =
-            kudu.createTable(kuduTableName, schema, tableOpts);
+        org.apache.kudu.client.KuduTable table = createKuduTable(
+            kudu, kuduTableName, schema, tableOpts, catalogTimeline);
         // Populate table ID from Kudu table if Kudu's integration with the Hive
         // Metastore is enabled.
         if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
@@ -312,7 +357,7 @@ public class KuduCatalogOpExecutor {
    * an equivalent schema for external tables. Throws an exception if any errors
    * are encountered.
    */
-  public static void populateExternalTableColsFromKudu(
+  public static void populateExternalTableColsFromKudu(EventSequence catalogTimeline,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
     org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
     List<FieldSchema> cols = msTblCopy.getSd().getCols();
@@ -327,13 +372,14 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Loading schema of table '%s' from master '%s'",
           kuduTableName, masterHosts));
     }
-    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    KuduClient kudu = getKuduClient(masterHosts, catalogTimeline);
     try {
-      if (!kudu.tableExists(kuduTableName)) {
+      if (!checkTableExistence(kudu, kuduTableName, catalogTimeline)) {
         throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
             "'%s'", kuduTableName));
       }
       org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName);
+      catalogTimeline.markEvent(OPENED_KUDU_TABLE);
       // Replace the columns in the Metastore table with the columns from the recently
       // accessed Kudu schema.
       cols.clear();
@@ -358,6 +404,7 @@ public class KuduCatalogOpExecutor {
     List<FieldSchema> newCols = msTbl.getSd().getCols();
     newCols.clear();
     newCols.addAll(cols);
+    catalogTimeline.markEvent(POPULATED_COLS_FROM_KUDU);
   }
 
   /**
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 175b56568..d408b10ee 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -440,6 +440,14 @@ class TestObservability(ImpalaTestSuite):
       r'Query Timeline:',
       r'Planning finished'
     ]
+    # Events for DDLs. Currently just CreateTable has them.
+    ddl_event_regexes = [
+      r'Catalog Server Operation:',
+      r'Got metastoreDdlLock:',
+      r'Created table in Metastore:',
+      r'Created table in catalog cache:',
+      r'DDL finished:',
+    ]
     # queries that explore different code paths in Frontend compilation
     queries = [
       'create table if not exists impala_6568 (i int)',
@@ -454,6 +462,10 @@ class TestObservability(ImpalaTestSuite):
       runtime_profile = self.execute_query(query).runtime_profile
       # and check that all the expected events appear in the resulting profile
       self.__verify_profile_contains_every_event(event_regexes, runtime_profile, query)
+      # Verify catalogOp timeline
+      if query.startswith("create table"):
+        self.__verify_profile_contains_every_event(
+            ddl_event_regexes, runtime_profile, query)
 
   def __verify_profile_contains_every_event(self, event_regexes, runtime_profile, query):
     """Test that all the expected events show up in a given query profile."""
@@ -462,6 +474,54 @@ class TestObservability(ImpalaTestSuite):
           "Didn't find event '" + regex + "' for query '" + query + \
           "' in profile: \n" + runtime_profile
 
+  def test_create_table_profile_events(self, unique_database):
+    # Create normal table
+    stmt = "create table %s.t1(id int)" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got metastoreDdlLock",
+        "Got Metastore client",
+        "Got current Metastore event id",
+        "Created table in Metastore",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "DDL finished"
+    ])
+    # Create Kudu table
+    stmt = "create table %s.t2(id int, name string, primary key(id))" \
+           " partition by hash(id) partitions 3 stored as kudu" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got Metastore client",
+        "Got current Metastore event id",
+        "Got Kudu client",
+        "Got kuduDdlLock",
+        "Checked table existence in Kudu",
+        "Created table in Kudu",
+        "Got metastoreDdlLock",
+        "Got Metastore client",
+        "Checked table existence in Metastore",
+        "Created table in Metastore",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "DDL finished",
+    ])
+    # Create Iceberg table
+    stmt = "create table %s.t3(id int) stored as iceberg" % unique_database
+    self.__verify_event_labels_in_profile(stmt, [
+        "Got Metastore client",
+        "Checked table existence in Metastore",
+        "Got current Metastore event id",
+        "Created table using Iceberg Catalog HIVE_CATALOG",
+        "Fetched event batch from Metastore",
+        "Created table in catalog cache",
+        "Set Iceberg table owner in Metastore",
+        "DDL finished",
+    ])
+
+  def __verify_event_labels_in_profile(self, stmt, event_labels):
+    profile = self.execute_query(stmt).runtime_profile
+    for label in event_labels:
+      assert label in profile
+
   def test_compute_stats_profile(self, unique_database):
     """Test that the profile for a 'compute stats' query contains three unique query ids:
     one for the parent 'compute stats' query and one each for the two child queries."""