You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/26 23:17:43 UTC

(impala) branch master updated: IMPALA-12747: Atomic update of execution state

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ac2ddbf IMPALA-12747: Atomic update of execution state
f3ac2ddbf is described below

commit f3ac2ddbfef0d7cd359b7c9ae47d424791327c6d
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Tue Jan 2 11:32:23 2024 -0800

    IMPALA-12747: Atomic update of execution state
    
    QueryDriver owns instances of ClientRequestState and TExecRequest. The
    ClientRequestState is used to track execution state of the client-facing
    side of a query. TExecRequest encapsulates context about the query
    produced by the planner.
    
    When a QueryDriver is created, it creates an instance of
    ClientRequestState, but has not yet executed planning. It would create
    an empty TExecRequest and pass a pointer to it to ClientRequestState,
    then update the content of TExecRequest when RunFrontendPlanner is
    called from ImpalaServer::ExecuteInternal.
    
    Updating TExecRequest was not atomic, so it was possible other
    operations - like producing a QueryStateRecord for /queries in the web
    UI - would try to read the content of TExecRequest while updating. This
    caused TSAN errors and occasional crashes in internal-server-test, which
    runs concurrent requests and examines them through calls to /queries.
    
    Changes ClientRequestState to
    - Provide a static placeholder for TExecRequest during creation that
      represents an empty context for an UNKNOWN statement type (default
      initialized in Thrift).
    - Make all references to TExecRequest const so its content cannot be
      updated in a non-thread-safe manner.
    - ClientRequestState uses an AtomicPtr which is updated atomically when
      the filled TExecRequest is available.
    
    QueryDriver does not publicly expose access to TExecRequest, so we can
    ensure its use is thread-safe without atomics.
    
    ClientRequestState::exec_request() will return either a reference to the
    static placeholder or the value provided after - which is never changed
    - so this reference will always be valid for the lifetime of the
    ClientRequestState.
    
    Updates user_has_profile_access to be AtomicBool for the same reason.
    
    Reverts tsan-suppressions for IMPALA-12660 so we get TSAN coverage. Adds
    suppression for a lock-order-inversion bug (IMPALA-12757) that was
    uncovered after fixing this data race.
    
    Testing:
    - InternalServerTest.SimultaneousMultipleQueriesOneSession would fail
      after ~10 test runs. Ran 90 times without failure.
    - Passed TSAN run of backend tests.
    
    Change-Id: I9a967c5c84b6a401f8f5764373f6cd7ee807545f
    Reviewed-on: http://gerrit.cloudera.org:8080/20956
    Reviewed-by: Jason Fehr <jf...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/query-driver.cc         | 104 +++++++++++++++++------
 be/src/runtime/query-driver.h          |  18 ++--
 be/src/service/client-request-state.cc | 146 +++++++++++++++++----------------
 be/src/service/client-request-state.h  |  52 +++++++-----
 be/src/service/impala-http-handler.cc  |   5 +-
 be/src/service/impala-server.cc        |  55 ++-----------
 bin/tsan-suppressions.txt              |  11 +--
 common/thrift/Frontend.thrift          |   2 +-
 common/thrift/Types.thrift             |   1 +
 9 files changed, 209 insertions(+), 185 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 0a285639b..15e3c0e19 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -29,6 +29,11 @@
 #include "common/names.h"
 #include "common/thread-debug-info.h"
 
+// Dumps used for debugging and diffing ExecRequests in text form.
+DEFINE_string(dump_exec_request_path, "",
+    "If set, dump TExecRequest structures to {dump_exec_request_path}/"
+    "TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
+
 DECLARE_string(debug_actions);
 
 namespace impala {
@@ -47,51 +52,94 @@ void QueryDriver::CreateClientRequestState(const TQueryCtx& query_ctx,
   DCHECK(client_request_state_ == nullptr);
   ExecEnv* exec_env = ExecEnv::GetInstance();
   lock_guard<SpinLock> l(client_request_state_lock_);
-  exec_request_ = make_unique<TExecRequest>();
   client_request_state_ =
       make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
-          session_state, exec_request_.get(), query_handle->query_driver().get());
+          session_state, query_handle->query_driver().get());
   DCHECK(query_handle != nullptr);
   (*query_handle).SetClientRequestState(client_request_state_.get());
 }
 
-Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
+void DumpTExecReq(const TExecRequest& exec_request, const char* dump_type,
+    const TUniqueId& query_id) {
+  if (FLAGS_dump_exec_request_path.empty()) return;
+  int depth = 0;
+  std::stringstream tmpstr;
+  string fn(Substitute("$0/TExecRequest-$1.$2", FLAGS_dump_exec_request_path,
+      dump_type, PrintId(query_id, "-")));
+  std::ofstream ofs(fn);
+  tmpstr << exec_request;
+  std::string s = tmpstr.str();
+  const char *p = s.c_str();
+  const int len = s.length();
+  for (int i = 0; i < len; ++i) {
+    const char ch = p[i];
+    ofs << ch;
+    if (ch == '(') {
+      depth++;
+    } else if (ch == ')' && depth > 0) {
+      depth--;
+    } else if (ch == ',') {
+    } else {
+      continue;
+    }
+    ofs << '\n' << std::setw(depth) << " ";
+  }
+}
+
+Status QueryDriver::DoFrontendPlanning(const TQueryCtx& query_ctx, bool use_request) {
   // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
   // this query.
-  DCHECK(client_request_state_ != nullptr);
-  DCHECK(exec_request_ != nullptr);
+  TExecRequest exec_request;
   RETURN_IF_ERROR(
       DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER"));
   RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus(
-      ExecEnv::GetInstance()->frontend()->GetExecRequest(
-          query_ctx, exec_request_.get())));
+      ExecEnv::GetInstance()->frontend()->GetExecRequest(query_ctx, &exec_request)));
+
+  DumpTExecReq(exec_request, "internal", client_request_state_->query_id());
+  if (use_request) exec_request_.reset(new TExecRequest(move(exec_request)));
+  return Status::OK();
+}
+
+Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
+  DCHECK(client_request_state_ != nullptr);
+  DCHECK(exec_request_ == nullptr);
+  RETURN_IF_ERROR(DoFrontendPlanning(query_ctx));
+
+  client_request_state_->SetExecRequest(exec_request_.get());
   return Status::OK();
 }
 
 Status QueryDriver::SetExternalPlan(
-    const TQueryCtx& query_ctx, const TExecRequest& external_exec_request) {
-  // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
-  // this query.
+    const TQueryCtx& query_ctx, TExecRequest external_exec_request) {
   DCHECK(client_request_state_ != nullptr);
-  DCHECK(exec_request_ != nullptr);
+  DCHECK(exec_request_ == nullptr);
+
+  if (!FLAGS_dump_exec_request_path.empty()) {
+    // Create and dump Impala planner results so we can compare with the external plan.
+    RETURN_IF_ERROR(DoFrontendPlanning(query_ctx, false));
+  }
+
   RETURN_IF_ERROR(
       DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER"));
-  *exec_request_.get() = external_exec_request;
   // Update query_id in the external request
-  exec_request_->query_exec_request.query_ctx.__set_query_id(
+  external_exec_request.query_exec_request.query_ctx.__set_query_id(
       client_request_state_->query_id());
   // Update coordinator related internal addresses in the external request
-  exec_request_->query_exec_request.query_ctx.__set_coord_hostname(
+  external_exec_request.query_exec_request.query_ctx.__set_coord_hostname(
       ExecEnv::GetInstance()->configured_backend_address().hostname);
   const TNetworkAddress& address =
       FromNetworkAddressPB(ExecEnv::GetInstance()->krpc_address());
   DCHECK(IsResolvedAddress(address));
-  exec_request_->query_exec_request.query_ctx.__set_coord_ip_address(address);
+  external_exec_request.query_exec_request.query_ctx.__set_coord_ip_address(address);
   // Update local_time_zone in the external request
-  exec_request_->query_exec_request.query_ctx.__set_local_time_zone(
+  external_exec_request.query_exec_request.query_ctx.__set_local_time_zone(
       query_ctx.local_time_zone);
-  exec_request_->query_exec_request.query_ctx.__set_now_string(
+  external_exec_request.query_exec_request.query_ctx.__set_now_string(
       query_ctx.now_string);
+  exec_request_.reset(new TExecRequest(move(external_exec_request)));
+
+  DumpTExecReq(*exec_request_, "external", client_request_state_->query_id());
+  client_request_state_->SetExecRequest(exec_request_.get());
   return Status::OK();
 }
 
@@ -126,6 +174,7 @@ void QueryDriver::TryQueryRetry(
   const TUniqueId& query_id = client_request_state->query_id();
   DCHECK(client_request_state->schedule() != nullptr);
 
+  DCHECK(exec_request_ != nullptr);
   if (exec_request_->query_options.retry_failed_queries) {
     lock_guard<mutex> l(*client_request_state->lock());
 
@@ -396,8 +445,8 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
   // query has been retried. Making a copy avoids any race conditions on the
   // exec_request_ since the retry_exec_request_ needs to set a new query id on the
   // TExecRequest object.
-  retry_exec_request_ = make_unique<TExecRequest>(*exec_request_);
-  TQueryCtx query_ctx = retry_exec_request_->query_exec_request.query_ctx;
+  unique_ptr<TExecRequest> exec_request = make_unique<TExecRequest>(*exec_request_);
+  TQueryCtx query_ctx = exec_request->query_exec_request.query_ctx;
   if (query_ctx.client_request.query_options.spool_all_results_for_retries) {
     // Reset this flag in the retry query since we won't retry again, so results can be
     // returned immediately.
@@ -413,7 +462,9 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
         << PrintId(client_request_state_->query_id());
   }
   parent_server_->PrepareQueryContext(&query_ctx);
-  retry_exec_request_->query_exec_request.__set_query_ctx(query_ctx);
+  exec_request->query_exec_request.__set_query_ctx(query_ctx);
+  // Move to a const owner to ensure TExecRequest will not be modified after this.
+  retry_exec_request_ = move(exec_request);
 
   ScopedThreadContext tdi_context(GetThreadDebugInfo(), query_ctx.query_id);
 
@@ -421,14 +472,13 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
   ExecEnv* exec_env = ExecEnv::GetInstance();
   *retry_request_state =
       make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
-          *session, retry_exec_request_.get(), request_state->parent_driver());
+          *session, request_state->parent_driver());
+  (*retry_request_state)->SetExecRequest(retry_exec_request_.get());
   (*retry_request_state)->SetOriginalId(request_state->query_id());
-  (*retry_request_state)
-      ->set_user_profile_access(
-          (*retry_request_state)->exec_request().user_has_profile_access);
-  if ((*retry_request_state)->exec_request().__isset.result_set_metadata) {
-    (*retry_request_state)
-        ->set_result_metadata((*retry_request_state)->exec_request().result_set_metadata);
+  (*retry_request_state)->set_user_profile_access(
+      retry_exec_request_->user_has_profile_access);
+  if (retry_exec_request_->__isset.result_set_metadata) {
+    (*retry_request_state)->set_result_metadata(retry_exec_request_->result_set_metadata);
   }
 }
 
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index 382aab9f4..b9ee2b3de 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -149,8 +149,8 @@ class QueryDriver {
   /// query string (TQueryCtx::TClientRequest::stmt).
   Status RunFrontendPlanner(const TQueryCtx& query_ctx) WARN_UNUSED_RESULT;
 
-  /// Similar to RunFrontendPlanner but takes TExecRequest from and external planner
-  Status SetExternalPlan(const TQueryCtx& query_ctx, const TExecRequest& exec_request);
+  /// Similar to RunFrontendPlanner but takes TExecRequest from an external planner
+  Status SetExternalPlan(const TQueryCtx& query_ctx, TExecRequest exec_request);
 
   /// Returns the ClientRequestState corresponding to the given query id.
   ClientRequestState* GetClientRequestState(const TUniqueId& query_id);
@@ -225,6 +225,12 @@ class QueryDriver {
       std::unique_ptr<ClientRequestState>* retry_request_state,
       std::shared_ptr<ImpalaServer::SessionState>* session);
 
+  /// Does the work of RunFrontendPlanner so we can also use it to dump the planner
+  /// result from SetExternalPlan to dump_exec_request_path without redundant work.
+  /// Set use_request to false to skip saving the TExecRequest produced in exec_request_.
+  Status DoFrontendPlanning(const TQueryCtx& query_ctx,
+      bool use_request = true) WARN_UNUSED_RESULT;
+
   /// Helper method for handling failures when retrying a query. 'status' is the reason
   /// why the retry failed and is expected to be in the error state. Additional details
   /// are added to the 'status'. Once the 'status' has been updated, it is set as the
@@ -248,12 +254,12 @@ class QueryDriver {
   std::unique_ptr<ClientRequestState> retried_client_request_state_;
 
   /// The TExecRequest for the query. Created in 'CreateClientRequestState' and loaded in
-  /// 'RunFrontendPlanner'.
-  std::unique_ptr<TExecRequest> exec_request_;
+  /// 'RunFrontendPlanner'. Not thread safe.
+  std::unique_ptr<const TExecRequest> exec_request_;
 
-  /// The TExecRequest for the retried query. Created in
+  /// The TExecRequest for the retried query. Created and initialized in
   /// 'CreateRetriedClientRequestState'.
-  std::unique_ptr<TExecRequest> retry_exec_request_;
+  std::unique_ptr<const TExecRequest> retry_exec_request_;
 
   /// Thread to process query retry requests. Done in a separate thread to avoid blocking
   /// control service RPC threads.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index f1eb16a36..761c12e7e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -92,9 +92,11 @@ static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk
 static const string QUERY_STATUS_KEY = "Query Status";
 static const string RETRY_STATUS_KEY = "Retry Status";
 
+const TExecRequest ClientRequestState::unknown_exec_request_;
+
 ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend,
     ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session,
-    TExecRequest* exec_request, QueryDriver* query_driver)
+    QueryDriver* query_driver)
   : query_ctx_(query_ctx),
     last_active_time_ms_(numeric_limits<int64_t>::max()),
     child_query_executor_(new ChildQueryExecutor),
@@ -105,7 +107,6 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro
     frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend", false)),
     server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer", false)),
     summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary", false)),
-    exec_request_(exec_request),
     frontend_(frontend),
     parent_server_(server),
     start_time_us_(UnixMicros()),
@@ -233,7 +234,7 @@ Status ClientRequestState::Exec() {
   summary_profile_->AddInfoString("Query Options (set by configuration)",
       DebugQueryOptions(query_ctx_.client_request.query_options));
   summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
-      DebugQueryOptions(exec_request_->query_options));
+      DebugQueryOptions(exec_request().query_options));
   if (query_ctx_.__isset.overridden_mt_dop_value) {
     DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop);
     summary_profile_->AddInfoString("MT_DOP limited by admission control",
@@ -242,41 +243,42 @@ Status ClientRequestState::Exec() {
             query_ctx_.client_request.query_options.mt_dop));
   }
 
-  switch (exec_request_->stmt_type) {
+  const TExecRequest& exec_req = exec_request();
+  switch (exec_req.stmt_type) {
     case TStmtType::QUERY:
     case TStmtType::DML:
-      DCHECK(exec_request_->__isset.query_exec_request);
+      DCHECK(exec_req.__isset.query_exec_request);
       RETURN_IF_ERROR(
-          ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/));
+          ExecQueryOrDmlRequest(exec_req.query_exec_request, true /*async*/));
       break;
     case TStmtType::EXPLAIN: {
       request_result_set_.reset(new vector<TResultRow>(
-          exec_request_->explain_result.results));
+          exec_req.explain_result.results));
       break;
     }
     case TStmtType::TESTCASE: {
-      DCHECK(exec_request_->__isset.testcase_data_path);
-      SetResultSet(vector<string>(1, exec_request_->testcase_data_path));
+      DCHECK(exec_req.__isset.testcase_data_path);
+      SetResultSet(vector<string>(1, exec_req.testcase_data_path));
       break;
     }
     case TStmtType::DDL: {
-      DCHECK(exec_request_->__isset.catalog_op_request);
+      DCHECK(exec_req.__isset.catalog_op_request);
       LOG_AND_RETURN_IF_ERROR(ExecDdlRequest());
       break;
     }
     case TStmtType::LOAD: {
-      DCHECK(exec_request_->__isset.load_data_request);
+      DCHECK(exec_req.__isset.load_data_request);
       LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest());
       break;
     }
     case TStmtType::SET: {
-      DCHECK(exec_request_->__isset.set_query_option_request);
+      DCHECK(exec_req.__isset.set_query_option_request);
       lock_guard<mutex> l(session_->lock);
-      if (exec_request_->set_query_option_request.__isset.key) {
+      if (exec_req.set_query_option_request.__isset.key) {
         // "SET key=value" updates the session query options.
-        DCHECK(exec_request_->set_query_option_request.__isset.value);
-        const auto& key = exec_request_->set_query_option_request.key;
-        const auto& value = exec_request_->set_query_option_request.value;
+        DCHECK(exec_req.set_query_option_request.__isset.value);
+        const auto& key = exec_req.set_query_option_request.key;
+        const auto& value = exec_req.set_query_option_request.value;
         RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options,
               &session_->set_query_options_mask));
         SetResultSet({}, {}, {});
@@ -287,8 +289,8 @@ Status ClientRequestState::Exec() {
           VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout="
                      << PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S);
         }
-      } else if (exec_request_->set_query_option_request.__isset.query_option_type
-          && exec_request_->set_query_option_request.query_option_type
+      } else if (exec_req.set_query_option_request.__isset.query_option_type
+          && exec_req.set_query_option_request.query_option_type
               == TQueryOptionType::UNSET_ALL) {
         // "UNSET ALL"
         RETURN_IF_ERROR(ResetAllQueryOptions(
@@ -297,25 +299,26 @@ Status ClientRequestState::Exec() {
       } else {
         // "SET" or "SET ALL"
         bool is_set_all =
-            exec_request_->set_query_option_request.__isset.query_option_type
-            && exec_request_->set_query_option_request.query_option_type
+            exec_req.set_query_option_request.__isset.query_option_type
+            && exec_req.set_query_option_request.query_option_type
                 == TQueryOptionType::SET_ALL;
         PopulateResultForSet(is_set_all);
       }
       break;
     }
     case TStmtType::ADMIN_FN:
-      DCHECK(exec_request_->admin_request.type == TAdminRequestType::SHUTDOWN);
+      DCHECK(exec_req.admin_request.type == TAdminRequestType::SHUTDOWN);
       RETURN_IF_ERROR(ExecShutdownRequest());
       break;
     case TStmtType::CONVERT:
-      DCHECK(exec_request_->__isset.convert_table_request);
+      DCHECK(exec_req.__isset.convert_table_request);
       LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest());
       break;
+    case TStmtType::UNKNOWN:
+      DCHECK(false);
+      return Status("Exec request uninitialized during execution");
     default:
-      stringstream errmsg;
-      errmsg << "Unknown exec request stmt type: " << exec_request_->stmt_type;
-      return Status(errmsg.str());
+      return Status(Substitute("Unknown exec request stmt type: $0", exec_req.stmt_type));
   }
 
   if (async_exec_thread_.get() == nullptr) {
@@ -351,7 +354,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
   switch (catalog_op.op_type) {
     case TCatalogOpType::USE: {
       lock_guard<mutex> l(session_->lock);
-      session_->database = exec_request_->catalog_op_request.use_db_params.db;
+      session_->database = exec_request().catalog_op_request.use_db_params.db;
       return Status::OK();
     }
     case TCatalogOpType::SHOW_TABLES:
@@ -585,12 +588,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
 }
 
 void ClientRequestState::FinishExecQueryOrDmlRequest() {
-  DCHECK(exec_request_->__isset.query_exec_request);
+  const TExecRequest& exec_req = exec_request();
+  DCHECK(exec_req.__isset.query_exec_request);
   UniqueIdPB query_id_pb;
   TUniqueIdToUniqueIdPB(query_id(), &query_id_pb);
   Status admit_status = admission_control_client_->SubmitForAdmission(
       {query_id_pb, ExecEnv::GetInstance()->backend_id(),
-          exec_request_->query_exec_request, exec_request_->query_options,
+          exec_req.query_exec_request, exec_req.query_options,
           summary_profile_, blacklisted_executor_addresses_},
       query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_);
   {
@@ -602,15 +606,15 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
   // startup. The query was not cancelled right before being admitted and the window here
   // is small enough to not require special handling. Instead we start the query and then
   // cancel it through the check below if necessary.
-  DebugActionNoFail(exec_request_->query_options, "CRS_BEFORE_COORD_STARTS");
+  DebugActionNoFail(exec_req.query_options, "CRS_BEFORE_COORD_STARTS");
   // Register the query with the server to support cancellation. This happens after
   // admission because now the set of executors is fixed and an executor failure will
   // cause a query failure.
   parent_server_->RegisterQueryLocations(schedule_->backend_exec_params(), query_id());
-  coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
+  coord_.reset(new Coordinator(this, exec_req, *schedule_.get(), query_events_));
   Status exec_status = coord_->Exec();
 
-  DebugActionNoFail(exec_request_->query_options, "CRS_AFTER_COORD_STARTS");
+  DebugActionNoFail(exec_req.query_options, "CRS_AFTER_COORD_STARTS");
 
   // Make coordinator profile visible, even upon failure.
   if (coord_->query_profile() != nullptr) profile_->AddChild(coord_->query_profile());
@@ -646,14 +650,14 @@ Status ClientRequestState::ExecDdlRequestImplSync() {
 
   if (catalog_op_type() != TCatalogOpType::DDL &&
       catalog_op_type() != TCatalogOpType::RESET_METADATA) {
-    Status status = ExecLocalCatalogOp(exec_request_->catalog_op_request);
+    Status status = ExecLocalCatalogOp(exec_request().catalog_op_request);
     lock_guard<mutex> l(lock_);
     return UpdateQueryStatus(status);
   }
 
   if (ddl_type() == TDdlType::COMPUTE_STATS) {
-    TComputeStatsParams& compute_stats_params =
-        exec_request_->catalog_op_request.ddl_params.compute_stats_params;
+    const TComputeStatsParams& compute_stats_params =
+        exec_request().catalog_op_request.ddl_params.compute_stats_params;
     RuntimeProfile* child_profile =
         RuntimeProfile::Create(&profile_pool_, "Child Queries");
     profile_->AddChild(child_profile);
@@ -689,6 +693,7 @@ Status ClientRequestState::ExecDdlRequestImplSync() {
 void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL
       && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
+  const TExecRequest& exec_req = exec_request();
 
   catalog_op_executor_.reset(
       new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
@@ -705,9 +710,9 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   }
 
   // Optionally wait with a debug action before Exec() below.
-  DebugActionNoFail(exec_request_->query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
+  DebugActionNoFail(exec_req.query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
 
-  Status status = catalog_op_executor_->Exec(exec_request_->catalog_op_request);
+  Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request);
   query_events_->MarkEvent("CatalogDdlRequest finished");
   {
     lock_guard<mutex> l(lock_);
@@ -729,7 +734,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   if (catalog_op_type() == TCatalogOpType::DDL &&
       ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
       !catalog_op_executor_->ddl_exec_response()->new_table_created) {
-    DCHECK(exec_request_->catalog_op_request.
+    DCHECK(exec_req.catalog_op_request.
         ddl_params.create_table_params.if_not_exists);
     return;
   }
@@ -737,7 +742,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   // Add newly created table to catalog cache.
   status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options(), query_events_);
+      exec_req.query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
@@ -748,9 +753,9 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
     // like a normal DML request. As with other DML requests, it will
     // wait for another catalog update if any partitions were altered as a result
     // of the operation.
-    DCHECK(exec_request_->__isset.query_exec_request);
+    DCHECK(exec_req.__isset.query_exec_request);
     RETURN_VOID_IF_ERROR(
-        ExecQueryOrDmlRequest(exec_request_->query_exec_request, !exec_in_worker_thread));
+        ExecQueryOrDmlRequest(exec_req.query_exec_request, !exec_in_worker_thread));
   }
 
   // Set the results to be reported to the client. Do this under lock to avoid races
@@ -778,7 +783,7 @@ Status ClientRequestState::ExecDdlRequest() {
   string op_type = catalog_op_type() == TCatalogOpType::DDL ?
       PrintValue(ddl_type()) : PrintValue(catalog_op_type());
   bool async_ddl = ShouldRunExecDdlAsync();
-  bool async_ddl_enabled = exec_request_->query_options.enable_async_ddl_execution;
+  bool async_ddl_enabled = exec_request().query_options.enable_async_ddl_execution;
   string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous";
 
   summary_profile_->AddInfoString("DDL Type", op_type);
@@ -802,17 +807,18 @@ Status ClientRequestState::ExecDdlRequest() {
 }
 
 void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
+  const TExecRequest& exec_req = exec_request();
   if (exec_in_worker_thread) {
     VLOG_QUERY << "Running in worker thread";
     DCHECK(exec_state() == ExecState::PENDING);
     UpdateNonErrorExecState(ExecState::RUNNING);
   }
   DebugActionNoFail(
-      exec_request_->query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
+      exec_req.query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
 
   TLoadDataResp response;
-  Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
-  if (exec_request_->load_data_request.iceberg_tbl) {
+  Status status = frontend_->LoadData(exec_req.load_data_request, &response);
+  if (exec_req.load_data_request.iceberg_tbl) {
     ExecLoadIcebergDataRequestImpl(response);
   }
   {
@@ -832,11 +838,11 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   // The partition_name is an empty string for unpartitioned tables.
   catalog_update.updated_partitions[response.partition_name] = updatedPartition;
 
-  catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+  catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
   catalog_update.__set_header(GetCatalogServiceRequestHeader());
-  catalog_update.target_table = exec_request_->load_data_request.table_name.table_name;
-  catalog_update.db_name = exec_request_->load_data_request.table_name.db_name;
-  catalog_update.is_overwrite = exec_request_->load_data_request.overwrite;
+  catalog_update.target_table = exec_req.load_data_request.table_name.table_name;
+  catalog_update.db_name = exec_req.load_data_request.table_name.db_name;
+  catalog_update.is_overwrite = exec_req.load_data_request.overwrite;
 
   CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(),
       *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status);
@@ -855,7 +861,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   status = parent_server_->ProcessCatalogUpdateResult(
       resp.result,
-      exec_request_->query_options.sync_ddl, query_options(), query_events_);
+      exec_req.query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
@@ -863,7 +869,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 }
 
 void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) {
-  TLoadDataReq load_data_req = exec_request_->load_data_request;
+  TLoadDataReq load_data_req = exec_request().load_data_request;
   RuntimeProfile* child_profile =
       RuntimeProfile::Create(&profile_pool_, "Child Queries");
   profile_->AddChild(child_profile);
@@ -934,7 +940,7 @@ void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response)
 
 
 Status ClientRequestState::ExecLoadDataRequest() {
-  if (exec_request_->query_options.enable_async_load_data_execution) {
+  if (exec_request().query_options.enable_async_load_data_execution) {
     // Transition the exec state out of INITIALIZED to PENDING to make available the
     // runtime profile for the DDL.
     UpdateNonErrorExecState(ExecState::PENDING);
@@ -948,7 +954,7 @@ Status ClientRequestState::ExecLoadDataRequest() {
 }
 
 Status ClientRequestState::ExecShutdownRequest() {
-  const TShutdownParams& request = exec_request_->admin_request.shutdown_params;
+  const TShutdownParams& request = exec_request().admin_request.shutdown_params;
   bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
   int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port;
   // Use the local shutdown code path if the host is unspecified or if it exactly matches
@@ -1184,7 +1190,7 @@ void ClientRequestState::Wait() {
 
 Status ClientRequestState::WaitInternal() {
   // Explain requests have already populated the result set. Nothing to do here.
-  if (exec_request_->stmt_type == TStmtType::EXPLAIN) {
+  if (exec_request().stmt_type == TStmtType::EXPLAIN) {
     MarkInactive();
     return Status::OK();
   }
@@ -1509,22 +1515,23 @@ void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized)
 }
 
 Status ClientRequestState::UpdateCatalog() {
-  if (!exec_request_->__isset.query_exec_request ||
-      exec_request_->query_exec_request.stmt_type != TStmtType::DML) {
+  const TExecRequest& exec_req = exec_request();
+  if (!exec_req.__isset.query_exec_request ||
+      exec_req.query_exec_request.stmt_type != TStmtType::DML) {
     return Status::OK();
   }
 
   query_events_->MarkEvent("DML data written");
   SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
 
-  const TQueryExecRequest& query_exec_request = exec_request_->query_exec_request;
+  const TQueryExecRequest& query_exec_request = exec_req.query_exec_request;
   if (query_exec_request.__isset.finalize_params) {
     const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
-    catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+    catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
     catalog_update.__set_header(GetCatalogServiceRequestHeader());
-    if (exec_request_->query_options.__isset.debug_action) {
-      catalog_update.__set_debug_action(exec_request_->query_options.debug_action);
+    if (exec_req.query_options.__isset.debug_action) {
+      catalog_update.__set_debug_action(exec_req.query_options.debug_action);
     }
     DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
     if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
@@ -1587,7 +1594,7 @@ Status ClientRequestState::UpdateCatalog() {
         query_events_->MarkEvent("Transaction committed");
       }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_->query_options.sync_ddl, query_options(), query_events_));
+          exec_req.query_options.sync_ddl, query_options(), query_events_));
     }
   } else if (InKuduTransaction()) {
     // Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1768,9 +1775,10 @@ Status ClientRequestState::UpdateTableAndColumnStats(
     col_stats_data = child_queries[1]->result_data();
   }
 
+  const TExecRequest& exec_req = exec_request();
   Status status = catalog_op_executor_->ExecComputeStats(
       GetCatalogServiceRequestHeader(),
-      exec_request_->catalog_op_request,
+      exec_req.catalog_op_request,
       child_queries[0]->result_schema(),
       child_queries[0]->result_data(),
       col_stats_schema,
@@ -1781,7 +1789,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
   RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl, query_options(), query_events_));
+      exec_req.query_options.sync_ddl, query_options(), query_events_));
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
@@ -1909,11 +1917,11 @@ void ClientRequestState::UpdateEndTime() {
 
 int64_t ClientRequestState::GetTransactionId() const {
   DCHECK(InTransaction());
-  return exec_request_->query_exec_request.finalize_params.transaction_id;
+  return exec_request().query_exec_request.finalize_params.transaction_id;
 }
 
 bool ClientRequestState::InTransaction() const {
-  return exec_request_->query_exec_request.finalize_params.__isset.transaction_id &&
+  return exec_request().query_exec_request.finalize_params.__isset.transaction_id &&
       !transaction_closed_;
 }
 
@@ -1935,8 +1943,7 @@ void ClientRequestState::ClearTransactionState() {
 bool ClientRequestState::InKuduTransaction() const {
   // If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional
   // is set as true by Frontend.doCreateExecRequest().
-  DCHECK(exec_request_ != nullptr);
-  return (exec_request_->query_exec_request.query_ctx.is_kudu_transactional
+  return (exec_request().query_exec_request.query_ctx.is_kudu_transactional
       && !transaction_closed_);
 }
 
@@ -1955,7 +1962,7 @@ Status ClientRequestState::CommitKuduTransaction() {
   DCHECK(InKuduTransaction());
   // Skip calling Commit() for Kudu Transaction with a debug action so that test code
   // could explicitly control over calling Commit().
-  Status status = DebugAction(exec_request_->query_options, "CRS_NOT_COMMIT_KUDU_TXN");
+  Status status = DebugAction(exec_request().query_options, "CRS_NOT_COMMIT_KUDU_TXN");
   if (UNLIKELY(!status.ok())) {
     VLOG(1) << Substitute("Skip to commit Kudu transaction with query-id: $0",
         PrintId(query_ctx_.query_id));
@@ -1988,6 +1995,7 @@ void ClientRequestState::LogQueryEvents() {
     case TStmtType::QUERY:
     case TStmtType::DML:
     case TStmtType::DDL:
+    case TStmtType::UNKNOWN:
       log_events = status.ok();
       break;
     case TStmtType::EXPLAIN:
@@ -2255,7 +2263,7 @@ Status ClientRequestState::ExecMigrateRequest() {
 void ClientRequestState::ExecMigrateRequestImpl() {
   // A convert table request holds the query strings for the sub-queries. These are
   // populated by ConvertTableToIcebergStmt in the Frontend during analysis.
-  TConvertTableRequest& params = exec_request_->convert_table_request;
+  const TConvertTableRequest& params = exec_request().convert_table_request;
   {
     RuntimeProfile* child_profile =
         RuntimeProfile::Create(&profile_pool_, "Child Queries 1");
@@ -2295,7 +2303,7 @@ void ClientRequestState::ExecMigrateRequestImpl() {
     }
   }
   // Create an external Iceberg table using the data of the HDFS table.
-  Status status = frontend_->Convert(*exec_request_);
+  Status status = frontend_->Convert(exec_request());
   if (!status.ok()) AddTableResetHints(params, &status);
   {
     lock_guard<mutex> l(lock_);
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 459fce310..a9f8c1912 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -78,8 +78,7 @@ class QuerySchedulePB;
 class ClientRequestState {
  public:
   ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend, ImpalaServer* server,
-      std::shared_ptr<ImpalaServer::SessionState> session, TExecRequest* exec_request,
-      QueryDriver* query_driver);
+      std::shared_ptr<ImpalaServer::SessionState> session, QueryDriver* query_driver);
 
   ~ClientRequestState();
 
@@ -87,6 +86,12 @@ class ClientRequestState {
 
   enum class RetryState { RETRYING, RETRIED, NOT_RETRIED };
 
+  /// Initialize TExecRequest. Prior to this, exec_request() returns a place-holder.
+  void SetExecRequest(const TExecRequest* exec_request) {
+    DCHECK(exec_request_.Load()->stmt_type == TStmtType::UNKNOWN);
+    exec_request_.Store(exec_request);
+  }
+
   /// Sets the profile that is produced by the frontend. The frontend creates the
   /// profile during planning and returns it to the backend via TExecRequest,
   /// which then sets the frontend profile.
@@ -234,7 +239,7 @@ class ClientRequestState {
   /// Queries are run and authorized on behalf of the effective_user.
   const std::string& effective_user() const;
   const std::string& connected_user() const { return query_ctx_.session.connected_user; }
-  bool user_has_profile_access() const { return user_has_profile_access_; }
+  bool user_has_profile_access() const { return user_has_profile_access_.Load(); }
   const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
   TSessionType::type session_type() const { return query_ctx_.session.session_type; }
   const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
@@ -253,14 +258,15 @@ class ClientRequestState {
   /// control.
   /// Admission control resource pool associated with this query.
   std::string request_pool() const {
-    if (is_planning_done_.load() && exec_request_ != nullptr
-        && exec_request_->query_exec_request.query_ctx.__isset.request_pool) {
-      // If the request pool has been set by Planner, return the request pool selected
-      // by Planner.
-      return exec_request_->query_exec_request.query_ctx.request_pool;
-    } else {
-      return query_ctx_.__isset.request_pool ? query_ctx_.request_pool : "";
+    if (is_planning_done_.load()) {
+      const TExecRequest& exec_req = exec_request();
+      if (exec_req.query_exec_request.query_ctx.__isset.request_pool) {
+        // If the request pool has been set by Planner, return the request pool selected
+        // by Planner.
+        return exec_req.query_exec_request.query_ctx.request_pool;
+      }
     }
+    return query_ctx_.__isset.request_pool ? query_ctx_.request_pool : "";
   }
 
   int num_rows_fetched() const { return num_rows_fetched_; }
@@ -270,18 +276,18 @@ class ClientRequestState {
   const TResultSetMetadata* result_metadata() const { return &result_metadata_; }
   const TUniqueId& query_id() const { return query_ctx_.query_id; }
   /// Returns the TExecRequest for the query associated with this ClientRequestState.
-  /// Contents are only valid after InitExecRequest(TQueryCtx) initializes the
+  /// Contents are a place-holder until GetExecRequest(TQueryCtx) initializes the
   /// TExecRequest.
   const TExecRequest& exec_request() const {
-    DCHECK(exec_request_ != nullptr);
-    return *exec_request_;
+    DCHECK(exec_request_.Load() != nullptr);
+    return *exec_request_.Load();
   }
-  TStmtType::type stmt_type() const { return exec_request_->stmt_type; }
+  TStmtType::type stmt_type() const { return exec_request().stmt_type; }
   TCatalogOpType::type catalog_op_type() const {
-    return exec_request_->catalog_op_request.op_type;
+    return exec_request().catalog_op_request.op_type;
   }
   TDdlType::type ddl_type() const {
-    return exec_request_->catalog_op_request.ddl_params.ddl_type;
+    return exec_request().catalog_op_request.ddl_params.ddl_type;
   }
   std::mutex* lock() { return &lock_; }
   std::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
@@ -302,7 +308,7 @@ class ClientRequestState {
   const Status& query_status() const { return query_status_; }
   void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
   void set_user_profile_access(bool user_has_profile_access) {
-    user_has_profile_access_ = user_has_profile_access;
+    user_has_profile_access_.Store(user_has_profile_access);
   }
   const RuntimeProfile* profile() const { return profile_; }
   const RuntimeProfile* summary_profile() const { return summary_profile_; }
@@ -316,7 +322,7 @@ class ClientRequestState {
   TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
 
   const std::vector<std::string>& GetAnalysisWarnings() const {
-    return exec_request_->analysis_warnings;
+    return exec_request().analysis_warnings;
   }
 
   inline int64_t last_active_ms() const {
@@ -660,14 +666,16 @@ class ClientRequestState {
   /// UpdateQueryStatus(Status) or MarkAsRetrying(Status).
   Status query_status_;
 
+  /// Default TExecRequest to return until SetExecRequest is called.
+  static const TExecRequest unknown_exec_request_;
+
   /// The TExecRequest for the query tracked by this ClientRequestState. The TExecRequest
-  /// is initialized in QueryDriver::RunFrontendPlanner(TQueryCtx).The TExecRequest is
-  /// owned by the parent QueryDriver.
-  TExecRequest* exec_request_;
+  /// is initialized once it's finalized. It's owned by the parent QueryDriver.
+  AtomicPtr<const TExecRequest> exec_request_{&unknown_exec_request_};
 
   /// If true, effective_user() has access to the runtime profile and execution
   /// summary.
-  bool user_has_profile_access_ = true;
+  AtomicBool user_has_profile_access_{true};
 
   TResultSetMetadata result_metadata_; // metadata for select query
   int num_rows_fetched_ = 0; // number of rows fetched by client for the entire query
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b17afb383..79de37006 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -1206,14 +1206,15 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
       lock_guard<mutex> l(*(*query_handle).lock());
       query_status = query_handle->query_status();
       stmt = query_handle->sql_stmt();
-      plan = query_handle->exec_request().query_exec_request.query_plan;
+      const TExecRequest& exec_request = query_handle->exec_request();
+      plan = exec_request.query_exec_request.query_plan;
       if ((include_json_plan || include_summary)
           && query_handle->GetCoordinator() != nullptr) {
         query_handle->GetCoordinator()->GetTExecSummary(&summary);
       }
       if (include_json_plan) {
         for (const TPlanExecInfo& plan_exec_info:
-            query_handle->exec_request().query_exec_request.plan_exec_info) {
+            exec_request.query_exec_request.plan_exec_info) {
           for (const TPlanFragment& fragment: plan_exec_info.fragments) {
             fragments.push_back(fragment);
           }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 21b778302..6217956fb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -349,11 +349,6 @@ DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
     "QueryEventHook execution. If this number is >1 then hooks will execute "
     "concurrently.");
 
-// Dumps used for debugging and diffing ExecRequests in text form.
-DEFINE_string(dump_exec_request_path, "",
-    "If set, dump TExecRequest structures to {dump_exec_request_path}/"
-    "TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
-
 DECLARE_bool(compact_catalog_topic);
 
 DEFINE_bool(use_local_tz_for_unix_timestamp_conversions, false,
@@ -1256,33 +1251,6 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
   return status;
 }
 
-void DumpTExecReq(const TExecRequest& exec_request, const char* dump_type,
-    const TUniqueId& query_id) {
-  if (FLAGS_dump_exec_request_path.empty()) return;
-  int depth = 0;
-  std::stringstream tmpstr;
-  string fn(Substitute("$0/TExecRequest-$1.$2", FLAGS_dump_exec_request_path,
-      dump_type, PrintId(query_id, "-")));
-  std::ofstream ofs(fn);
-  tmpstr << exec_request;
-  std::string s = tmpstr.str();
-  const char *p = s.c_str();
-  const int len = s.length();
-  for (int i = 0; i < len; ++i) {
-    const char ch = p[i];
-    ofs << ch;
-    if (ch == '(') {
-      depth++;
-    } else if (ch == ')' && depth > 0) {
-      depth--;
-    } else if (ch == ',') {
-    } else {
-      continue;
-    }
-    ofs << '\n' << std::setw(depth) << " ";
-  }
-}
-
 Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
     const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state,
     bool* registered_query, QueryHandle* query_handle) {
@@ -1329,17 +1297,6 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
           statement_length, max_statement_length));
     }
 
-    Status exec_status = Status::OK();
-    TUniqueId query_id = (*query_handle)->query_id();
-    // Generate TExecRequest here if one was not passed in or we want one
-    // from the Impala planner to compare with
-    if (!is_external_req || !FLAGS_dump_exec_request_path.empty()) {
-      // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest
-      // for this query.
-      RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
-      DumpTExecReq((*query_handle)->exec_request(), "internal", query_id);
-    }
-
     if (is_external_req) {
       // Use passed in exec_request
       RETURN_IF_ERROR(query_handle->query_driver()->SetExternalPlan(
@@ -1349,8 +1306,9 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
         RETURN_IF_ERROR(exec_env_->frontend()->addTransaction(
             external_exec_request->query_exec_request.query_ctx));
       }
-      exec_status = Status::OK();
-      DumpTExecReq((*query_handle)->exec_request(), "external", query_id);
+    } else {
+      // Generate TExecRequest here if one was not passed in
+      RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
     }
 
     const TExecRequest& result = (*query_handle)->exec_request();
@@ -2531,12 +2489,10 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& query
 
 void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle) {
   id = query_handle.query_id();
-  const TExecRequest& request = query_handle.exec_request();
 
   const string* plan_str = query_handle.summary_profile()->GetInfoString("Plan");
   if (plan_str != nullptr) plan = *plan_str;
   stmt = query_handle.sql_stmt();
-  stmt_type = request.stmt_type;
   effective_user = query_handle.effective_user();
   default_db = query_handle.default_db();
   start_time_us = query_handle.start_time_us();
@@ -2579,9 +2535,10 @@ void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle
 
   query_handle.query_events()->ToThrift(&event_sequence);
 
+  const TExecRequest& request = query_handle.exec_request();
+  stmt_type = request.stmt_type;
   // Save the query fragments so that the plan can be visualised.
-  for (const TPlanExecInfo& plan_exec_info:
-      query_handle.exec_request().query_exec_request.plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: request.query_exec_request.plan_exec_info) {
     fragments.insert(fragments.end(),
         plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
   }
diff --git a/bin/tsan-suppressions.txt b/bin/tsan-suppressions.txt
index ab4a30426..580cc1a72 100644
--- a/bin/tsan-suppressions.txt
+++ b/bin/tsan-suppressions.txt
@@ -45,12 +45,5 @@ race:impala::RpcMgrKerberizedTest
 # TODO: IMPALA-9403: Allow TSAN to be set on codegen
 race:impala::HdfsColumnarScanner::ProcessScratchBatchCodegenOrInterpret
 
-# This race condition exists when the Impala UI is refreshed while the frontend planner
-# is updating the TExecRequest in the QueryDriver or the ClientRequestState since it holds
-# a pointer. The implications are the UI displays slightly outdated data until the next
-# refresh. Correcting this condition would require holding a lock during the entire
-# planning phase.
-# The SimultaneousMultipleQueriesOneSession test within internal-server-test.cc surfaced
-# this data race. Since this test runs multiple threads, it is not the test function
-# itself but rather the threads it launches that cause the data race .
-race:impala::internalservertest::runTestQueries
+# TODO: IMPALA-12757: TSAN flags lock-order-inversion during internal-server-test
+deadlock:impala::KrpcDataStreamSender::Channel::WaitForRpcLocked
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f19f532c2..d564fd631 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -617,7 +617,7 @@ struct TConvertTableRequest {
 
 // Result of call to createExecRequest()
 struct TExecRequest {
-  1: required Types.TStmtType stmt_type
+  1: required Types.TStmtType stmt_type = TStmtType.UNKNOWN
 
   // Copied from the corresponding TClientRequest
   2: required Query.TQueryOptions query_options
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index aaecb0602..f36ae556f 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -110,6 +110,7 @@ enum TStmtType {
   ADMIN_FN = 6
   TESTCASE = 7
   CONVERT = 8
+  UNKNOWN = 9
 }
 
 enum TIcebergOperation {