You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/26 23:17:43 UTC
(impala) branch master updated: IMPALA-12747: Atomic update of execution state
This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f3ac2ddbf IMPALA-12747: Atomic update of execution state
f3ac2ddbf is described below
commit f3ac2ddbfef0d7cd359b7c9ae47d424791327c6d
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Tue Jan 2 11:32:23 2024 -0800
IMPALA-12747: Atomic update of execution state
QueryDriver owns instances of ClientRequestState and TExecRequest. The
ClientRequestState is used to track execution state of the client-facing
side of a query. TExecRequest encapsulates context about the query
produced by the planner.
When a QueryDriver is created, it creates an instance of
ClientRequestState, but has not yet executed planning. It would create
an empty TExecRequest and pass a pointer to it to ClientRequestState,
then update the content of TExecRequest when RunFrontendPlanner is
called from ImpalaServer::ExecuteInternal.
Updating TExecRequest was not atomic, so it was possible other
operations - like producing a QueryStateRecord for /queries in the web
UI - would try to read the content of TExecRequest while updating. This
caused TSAN errors and occasional crashes in internal-server-test, which
runs concurrent requests and examines them through calls to /queries.
Changes ClientRequestState to
- Provide a static placeholder for TExecRequest during creation that
represents an empty context for an UNKNOWN statement type (default
initialized in Thrift).
- Make all references to TExecRequest const so its content cannot be
updated in a non-thread-safe manner.
- ClientRequestState uses an AtomicPtr which is updated atomically when
the filled TExecRequest is available.
QueryDriver does not publicly expose access to TExecRequest, so we can
ensure its use is thread-safe without atomics.
ClientRequestState::exec_request() will return either a reference to the
static placeholder or the value provided after - which is never changed
- so this reference will always be valid for the lifetime of the
ClientRequestState.
Updates user_has_profile_access to be AtomicBool for the same reason.
Reverts tsan-suppressions for IMPALA-12660 so we get TSAN coverage. Adds
suppression for a lock-order-inversion bug (IMPALA-12757) that was
uncovered after fixing this data race.
Testing:
- InternalServerTest.SimultaneousMultipleQueriesOneSession would fail
after ~10 test runs. Ran 90 times without failure.
- Passed TSAN run of backend tests.
Change-Id: I9a967c5c84b6a401f8f5764373f6cd7ee807545f
Reviewed-on: http://gerrit.cloudera.org:8080/20956
Reviewed-by: Jason Fehr <jf...@cloudera.com>
Reviewed-by: Riza Suminto <ri...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/query-driver.cc | 104 +++++++++++++++++------
be/src/runtime/query-driver.h | 18 ++--
be/src/service/client-request-state.cc | 146 +++++++++++++++++----------------
be/src/service/client-request-state.h | 52 +++++++-----
be/src/service/impala-http-handler.cc | 5 +-
be/src/service/impala-server.cc | 55 ++-----------
bin/tsan-suppressions.txt | 11 +--
common/thrift/Frontend.thrift | 2 +-
common/thrift/Types.thrift | 1 +
9 files changed, 209 insertions(+), 185 deletions(-)
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 0a285639b..15e3c0e19 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -29,6 +29,11 @@
#include "common/names.h"
#include "common/thread-debug-info.h"
+// Dumps used for debugging and diffing ExecRequests in text form.
+DEFINE_string(dump_exec_request_path, "",
+ "If set, dump TExecRequest structures to {dump_exec_request_path}/"
+ "TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
+
DECLARE_string(debug_actions);
namespace impala {
@@ -47,51 +52,94 @@ void QueryDriver::CreateClientRequestState(const TQueryCtx& query_ctx,
DCHECK(client_request_state_ == nullptr);
ExecEnv* exec_env = ExecEnv::GetInstance();
lock_guard<SpinLock> l(client_request_state_lock_);
- exec_request_ = make_unique<TExecRequest>();
client_request_state_ =
make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
- session_state, exec_request_.get(), query_handle->query_driver().get());
+ session_state, query_handle->query_driver().get());
DCHECK(query_handle != nullptr);
(*query_handle).SetClientRequestState(client_request_state_.get());
}
-Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
+void DumpTExecReq(const TExecRequest& exec_request, const char* dump_type,
+ const TUniqueId& query_id) {
+ if (FLAGS_dump_exec_request_path.empty()) return;
+ int depth = 0;
+ std::stringstream tmpstr;
+ string fn(Substitute("$0/TExecRequest-$1.$2", FLAGS_dump_exec_request_path,
+ dump_type, PrintId(query_id, "-")));
+ std::ofstream ofs(fn);
+ tmpstr << exec_request;
+ std::string s = tmpstr.str();
+ const char *p = s.c_str();
+ const int len = s.length();
+ for (int i = 0; i < len; ++i) {
+ const char ch = p[i];
+ ofs << ch;
+ if (ch == '(') {
+ depth++;
+ } else if (ch == ')' && depth > 0) {
+ depth--;
+ } else if (ch == ',') {
+ } else {
+ continue;
+ }
+ ofs << '\n' << std::setw(depth) << " ";
+ }
+}
+
+Status QueryDriver::DoFrontendPlanning(const TQueryCtx& query_ctx, bool use_request) {
// Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
// this query.
- DCHECK(client_request_state_ != nullptr);
- DCHECK(exec_request_ != nullptr);
+ TExecRequest exec_request;
RETURN_IF_ERROR(
DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER"));
RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus(
- ExecEnv::GetInstance()->frontend()->GetExecRequest(
- query_ctx, exec_request_.get())));
+ ExecEnv::GetInstance()->frontend()->GetExecRequest(query_ctx, &exec_request)));
+
+ DumpTExecReq(exec_request, "internal", client_request_state_->query_id());
+ if (use_request) exec_request_.reset(new TExecRequest(move(exec_request)));
+ return Status::OK();
+}
+
+Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
+ DCHECK(client_request_state_ != nullptr);
+ DCHECK(exec_request_ == nullptr);
+ RETURN_IF_ERROR(DoFrontendPlanning(query_ctx));
+
+ client_request_state_->SetExecRequest(exec_request_.get());
return Status::OK();
}
Status QueryDriver::SetExternalPlan(
- const TQueryCtx& query_ctx, const TExecRequest& external_exec_request) {
- // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
- // this query.
+ const TQueryCtx& query_ctx, TExecRequest external_exec_request) {
DCHECK(client_request_state_ != nullptr);
- DCHECK(exec_request_ != nullptr);
+ DCHECK(exec_request_ == nullptr);
+
+ if (!FLAGS_dump_exec_request_path.empty()) {
+ // Create and dump Impala planner results so we can compare with the external plan.
+ RETURN_IF_ERROR(DoFrontendPlanning(query_ctx, false));
+ }
+
RETURN_IF_ERROR(
DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER"));
- *exec_request_.get() = external_exec_request;
// Update query_id in the external request
- exec_request_->query_exec_request.query_ctx.__set_query_id(
+ external_exec_request.query_exec_request.query_ctx.__set_query_id(
client_request_state_->query_id());
// Update coordinator related internal addresses in the external request
- exec_request_->query_exec_request.query_ctx.__set_coord_hostname(
+ external_exec_request.query_exec_request.query_ctx.__set_coord_hostname(
ExecEnv::GetInstance()->configured_backend_address().hostname);
const TNetworkAddress& address =
FromNetworkAddressPB(ExecEnv::GetInstance()->krpc_address());
DCHECK(IsResolvedAddress(address));
- exec_request_->query_exec_request.query_ctx.__set_coord_ip_address(address);
+ external_exec_request.query_exec_request.query_ctx.__set_coord_ip_address(address);
// Update local_time_zone in the external request
- exec_request_->query_exec_request.query_ctx.__set_local_time_zone(
+ external_exec_request.query_exec_request.query_ctx.__set_local_time_zone(
query_ctx.local_time_zone);
- exec_request_->query_exec_request.query_ctx.__set_now_string(
+ external_exec_request.query_exec_request.query_ctx.__set_now_string(
query_ctx.now_string);
+ exec_request_.reset(new TExecRequest(move(external_exec_request)));
+
+ DumpTExecReq(*exec_request_, "external", client_request_state_->query_id());
+ client_request_state_->SetExecRequest(exec_request_.get());
return Status::OK();
}
@@ -126,6 +174,7 @@ void QueryDriver::TryQueryRetry(
const TUniqueId& query_id = client_request_state->query_id();
DCHECK(client_request_state->schedule() != nullptr);
+ DCHECK(exec_request_ != nullptr);
if (exec_request_->query_options.retry_failed_queries) {
lock_guard<mutex> l(*client_request_state->lock());
@@ -396,8 +445,8 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
// query has been retried. Making a copy avoids any race conditions on the
// exec_request_ since the retry_exec_request_ needs to set a new query id on the
// TExecRequest object.
- retry_exec_request_ = make_unique<TExecRequest>(*exec_request_);
- TQueryCtx query_ctx = retry_exec_request_->query_exec_request.query_ctx;
+ unique_ptr<TExecRequest> exec_request = make_unique<TExecRequest>(*exec_request_);
+ TQueryCtx query_ctx = exec_request->query_exec_request.query_ctx;
if (query_ctx.client_request.query_options.spool_all_results_for_retries) {
// Reset this flag in the retry query since we won't retry again, so results can be
// returned immediately.
@@ -413,7 +462,9 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
<< PrintId(client_request_state_->query_id());
}
parent_server_->PrepareQueryContext(&query_ctx);
- retry_exec_request_->query_exec_request.__set_query_ctx(query_ctx);
+ exec_request->query_exec_request.__set_query_ctx(query_ctx);
+ // Move to a const owner to ensure TExecRequest will not be modified after this.
+ retry_exec_request_ = move(exec_request);
ScopedThreadContext tdi_context(GetThreadDebugInfo(), query_ctx.query_id);
@@ -421,14 +472,13 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
ExecEnv* exec_env = ExecEnv::GetInstance();
*retry_request_state =
make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
- *session, retry_exec_request_.get(), request_state->parent_driver());
+ *session, request_state->parent_driver());
+ (*retry_request_state)->SetExecRequest(retry_exec_request_.get());
(*retry_request_state)->SetOriginalId(request_state->query_id());
- (*retry_request_state)
- ->set_user_profile_access(
- (*retry_request_state)->exec_request().user_has_profile_access);
- if ((*retry_request_state)->exec_request().__isset.result_set_metadata) {
- (*retry_request_state)
- ->set_result_metadata((*retry_request_state)->exec_request().result_set_metadata);
+ (*retry_request_state)->set_user_profile_access(
+ retry_exec_request_->user_has_profile_access);
+ if (retry_exec_request_->__isset.result_set_metadata) {
+ (*retry_request_state)->set_result_metadata(retry_exec_request_->result_set_metadata);
}
}
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index 382aab9f4..b9ee2b3de 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -149,8 +149,8 @@ class QueryDriver {
/// query string (TQueryCtx::TClientRequest::stmt).
Status RunFrontendPlanner(const TQueryCtx& query_ctx) WARN_UNUSED_RESULT;
- /// Similar to RunFrontendPlanner but takes TExecRequest from and external planner
- Status SetExternalPlan(const TQueryCtx& query_ctx, const TExecRequest& exec_request);
+ /// Similar to RunFrontendPlanner but takes TExecRequest from an external planner
+ Status SetExternalPlan(const TQueryCtx& query_ctx, TExecRequest exec_request);
/// Returns the ClientRequestState corresponding to the given query id.
ClientRequestState* GetClientRequestState(const TUniqueId& query_id);
@@ -225,6 +225,12 @@ class QueryDriver {
std::unique_ptr<ClientRequestState>* retry_request_state,
std::shared_ptr<ImpalaServer::SessionState>* session);
+ /// Does the work of RunFrontendPlanner so we can also use it to dump the planner
+ /// result from SetExternalPlan to dump_exec_request_path without redundant work.
+ /// Set use_request to false to skip saving the TExecRequest produced in exec_request_.
+ Status DoFrontendPlanning(const TQueryCtx& query_ctx,
+ bool use_request = true) WARN_UNUSED_RESULT;
+
/// Helper method for handling failures when retrying a query. 'status' is the reason
/// why the retry failed and is expected to be in the error state. Additional details
/// are added to the 'status'. Once the 'status' has been updated, it is set as the
@@ -248,12 +254,12 @@ class QueryDriver {
std::unique_ptr<ClientRequestState> retried_client_request_state_;
/// The TExecRequest for the query. Created in 'CreateClientRequestState' and loaded in
- /// 'RunFrontendPlanner'.
- std::unique_ptr<TExecRequest> exec_request_;
+ /// 'RunFrontendPlanner'. Not thread safe.
+ std::unique_ptr<const TExecRequest> exec_request_;
- /// The TExecRequest for the retried query. Created in
+ /// The TExecRequest for the retried query. Created and initialized in
/// 'CreateRetriedClientRequestState'.
- std::unique_ptr<TExecRequest> retry_exec_request_;
+ std::unique_ptr<const TExecRequest> retry_exec_request_;
/// Thread to process query retry requests. Done in a separate thread to avoid blocking
/// control service RPC threads.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index f1eb16a36..761c12e7e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -92,9 +92,11 @@ static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk
static const string QUERY_STATUS_KEY = "Query Status";
static const string RETRY_STATUS_KEY = "Retry Status";
+const TExecRequest ClientRequestState::unknown_exec_request_;
+
ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend,
ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session,
- TExecRequest* exec_request, QueryDriver* query_driver)
+ QueryDriver* query_driver)
: query_ctx_(query_ctx),
last_active_time_ms_(numeric_limits<int64_t>::max()),
child_query_executor_(new ChildQueryExecutor),
@@ -105,7 +107,6 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro
frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend", false)),
server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer", false)),
summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary", false)),
- exec_request_(exec_request),
frontend_(frontend),
parent_server_(server),
start_time_us_(UnixMicros()),
@@ -233,7 +234,7 @@ Status ClientRequestState::Exec() {
summary_profile_->AddInfoString("Query Options (set by configuration)",
DebugQueryOptions(query_ctx_.client_request.query_options));
summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
- DebugQueryOptions(exec_request_->query_options));
+ DebugQueryOptions(exec_request().query_options));
if (query_ctx_.__isset.overridden_mt_dop_value) {
DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop);
summary_profile_->AddInfoString("MT_DOP limited by admission control",
@@ -242,41 +243,42 @@ Status ClientRequestState::Exec() {
query_ctx_.client_request.query_options.mt_dop));
}
- switch (exec_request_->stmt_type) {
+ const TExecRequest& exec_req = exec_request();
+ switch (exec_req.stmt_type) {
case TStmtType::QUERY:
case TStmtType::DML:
- DCHECK(exec_request_->__isset.query_exec_request);
+ DCHECK(exec_req.__isset.query_exec_request);
RETURN_IF_ERROR(
- ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/));
+ ExecQueryOrDmlRequest(exec_req.query_exec_request, true /*async*/));
break;
case TStmtType::EXPLAIN: {
request_result_set_.reset(new vector<TResultRow>(
- exec_request_->explain_result.results));
+ exec_req.explain_result.results));
break;
}
case TStmtType::TESTCASE: {
- DCHECK(exec_request_->__isset.testcase_data_path);
- SetResultSet(vector<string>(1, exec_request_->testcase_data_path));
+ DCHECK(exec_req.__isset.testcase_data_path);
+ SetResultSet(vector<string>(1, exec_req.testcase_data_path));
break;
}
case TStmtType::DDL: {
- DCHECK(exec_request_->__isset.catalog_op_request);
+ DCHECK(exec_req.__isset.catalog_op_request);
LOG_AND_RETURN_IF_ERROR(ExecDdlRequest());
break;
}
case TStmtType::LOAD: {
- DCHECK(exec_request_->__isset.load_data_request);
+ DCHECK(exec_req.__isset.load_data_request);
LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest());
break;
}
case TStmtType::SET: {
- DCHECK(exec_request_->__isset.set_query_option_request);
+ DCHECK(exec_req.__isset.set_query_option_request);
lock_guard<mutex> l(session_->lock);
- if (exec_request_->set_query_option_request.__isset.key) {
+ if (exec_req.set_query_option_request.__isset.key) {
// "SET key=value" updates the session query options.
- DCHECK(exec_request_->set_query_option_request.__isset.value);
- const auto& key = exec_request_->set_query_option_request.key;
- const auto& value = exec_request_->set_query_option_request.value;
+ DCHECK(exec_req.set_query_option_request.__isset.value);
+ const auto& key = exec_req.set_query_option_request.key;
+ const auto& value = exec_req.set_query_option_request.value;
RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options,
&session_->set_query_options_mask));
SetResultSet({}, {}, {});
@@ -287,8 +289,8 @@ Status ClientRequestState::Exec() {
VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout="
<< PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S);
}
- } else if (exec_request_->set_query_option_request.__isset.query_option_type
- && exec_request_->set_query_option_request.query_option_type
+ } else if (exec_req.set_query_option_request.__isset.query_option_type
+ && exec_req.set_query_option_request.query_option_type
== TQueryOptionType::UNSET_ALL) {
// "UNSET ALL"
RETURN_IF_ERROR(ResetAllQueryOptions(
@@ -297,25 +299,26 @@ Status ClientRequestState::Exec() {
} else {
// "SET" or "SET ALL"
bool is_set_all =
- exec_request_->set_query_option_request.__isset.query_option_type
- && exec_request_->set_query_option_request.query_option_type
+ exec_req.set_query_option_request.__isset.query_option_type
+ && exec_req.set_query_option_request.query_option_type
== TQueryOptionType::SET_ALL;
PopulateResultForSet(is_set_all);
}
break;
}
case TStmtType::ADMIN_FN:
- DCHECK(exec_request_->admin_request.type == TAdminRequestType::SHUTDOWN);
+ DCHECK(exec_req.admin_request.type == TAdminRequestType::SHUTDOWN);
RETURN_IF_ERROR(ExecShutdownRequest());
break;
case TStmtType::CONVERT:
- DCHECK(exec_request_->__isset.convert_table_request);
+ DCHECK(exec_req.__isset.convert_table_request);
LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest());
break;
+ case TStmtType::UNKNOWN:
+ DCHECK(false);
+ return Status("Exec request uninitialized during execution");
default:
- stringstream errmsg;
- errmsg << "Unknown exec request stmt type: " << exec_request_->stmt_type;
- return Status(errmsg.str());
+ return Status(Substitute("Unknown exec request stmt type: $0", exec_req.stmt_type));
}
if (async_exec_thread_.get() == nullptr) {
@@ -351,7 +354,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
switch (catalog_op.op_type) {
case TCatalogOpType::USE: {
lock_guard<mutex> l(session_->lock);
- session_->database = exec_request_->catalog_op_request.use_db_params.db;
+ session_->database = exec_request().catalog_op_request.use_db_params.db;
return Status::OK();
}
case TCatalogOpType::SHOW_TABLES:
@@ -585,12 +588,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
}
void ClientRequestState::FinishExecQueryOrDmlRequest() {
- DCHECK(exec_request_->__isset.query_exec_request);
+ const TExecRequest& exec_req = exec_request();
+ DCHECK(exec_req.__isset.query_exec_request);
UniqueIdPB query_id_pb;
TUniqueIdToUniqueIdPB(query_id(), &query_id_pb);
Status admit_status = admission_control_client_->SubmitForAdmission(
{query_id_pb, ExecEnv::GetInstance()->backend_id(),
- exec_request_->query_exec_request, exec_request_->query_options,
+ exec_req.query_exec_request, exec_req.query_options,
summary_profile_, blacklisted_executor_addresses_},
query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_);
{
@@ -602,15 +606,15 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
// startup. The query was not cancelled right before being admitted and the window here
// is small enough to not require special handling. Instead we start the query and then
// cancel it through the check below if necessary.
- DebugActionNoFail(exec_request_->query_options, "CRS_BEFORE_COORD_STARTS");
+ DebugActionNoFail(exec_req.query_options, "CRS_BEFORE_COORD_STARTS");
// Register the query with the server to support cancellation. This happens after
// admission because now the set of executors is fixed and an executor failure will
// cause a query failure.
parent_server_->RegisterQueryLocations(schedule_->backend_exec_params(), query_id());
- coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
+ coord_.reset(new Coordinator(this, exec_req, *schedule_.get(), query_events_));
Status exec_status = coord_->Exec();
- DebugActionNoFail(exec_request_->query_options, "CRS_AFTER_COORD_STARTS");
+ DebugActionNoFail(exec_req.query_options, "CRS_AFTER_COORD_STARTS");
// Make coordinator profile visible, even upon failure.
if (coord_->query_profile() != nullptr) profile_->AddChild(coord_->query_profile());
@@ -646,14 +650,14 @@ Status ClientRequestState::ExecDdlRequestImplSync() {
if (catalog_op_type() != TCatalogOpType::DDL &&
catalog_op_type() != TCatalogOpType::RESET_METADATA) {
- Status status = ExecLocalCatalogOp(exec_request_->catalog_op_request);
+ Status status = ExecLocalCatalogOp(exec_request().catalog_op_request);
lock_guard<mutex> l(lock_);
return UpdateQueryStatus(status);
}
if (ddl_type() == TDdlType::COMPUTE_STATS) {
- TComputeStatsParams& compute_stats_params =
- exec_request_->catalog_op_request.ddl_params.compute_stats_params;
+ const TComputeStatsParams& compute_stats_params =
+ exec_request().catalog_op_request.ddl_params.compute_stats_params;
RuntimeProfile* child_profile =
RuntimeProfile::Create(&profile_pool_, "Child Queries");
profile_->AddChild(child_profile);
@@ -689,6 +693,7 @@ Status ClientRequestState::ExecDdlRequestImplSync() {
void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL
&& ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
+ const TExecRequest& exec_req = exec_request();
catalog_op_executor_.reset(
new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
@@ -705,9 +710,9 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
}
// Optionally wait with a debug action before Exec() below.
- DebugActionNoFail(exec_request_->query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
+ DebugActionNoFail(exec_req.query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
- Status status = catalog_op_executor_->Exec(exec_request_->catalog_op_request);
+ Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request);
query_events_->MarkEvent("CatalogDdlRequest finished");
{
lock_guard<mutex> l(lock_);
@@ -729,7 +734,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
if (catalog_op_type() == TCatalogOpType::DDL &&
ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
!catalog_op_executor_->ddl_exec_response()->new_table_created) {
- DCHECK(exec_request_->catalog_op_request.
+ DCHECK(exec_req.catalog_op_request.
ddl_params.create_table_params.if_not_exists);
return;
}
@@ -737,7 +742,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
// Add newly created table to catalog cache.
status = parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
- exec_request_->query_options.sync_ddl, query_options(), query_events_);
+ exec_req.query_options.sync_ddl, query_options(), query_events_);
{
lock_guard<mutex> l(lock_);
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
@@ -748,9 +753,9 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
// like a normal DML request. As with other DML requests, it will
// wait for another catalog update if any partitions were altered as a result
// of the operation.
- DCHECK(exec_request_->__isset.query_exec_request);
+ DCHECK(exec_req.__isset.query_exec_request);
RETURN_VOID_IF_ERROR(
- ExecQueryOrDmlRequest(exec_request_->query_exec_request, !exec_in_worker_thread));
+ ExecQueryOrDmlRequest(exec_req.query_exec_request, !exec_in_worker_thread));
}
// Set the results to be reported to the client. Do this under lock to avoid races
@@ -778,7 +783,7 @@ Status ClientRequestState::ExecDdlRequest() {
string op_type = catalog_op_type() == TCatalogOpType::DDL ?
PrintValue(ddl_type()) : PrintValue(catalog_op_type());
bool async_ddl = ShouldRunExecDdlAsync();
- bool async_ddl_enabled = exec_request_->query_options.enable_async_ddl_execution;
+ bool async_ddl_enabled = exec_request().query_options.enable_async_ddl_execution;
string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous";
summary_profile_->AddInfoString("DDL Type", op_type);
@@ -802,17 +807,18 @@ Status ClientRequestState::ExecDdlRequest() {
}
void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
+ const TExecRequest& exec_req = exec_request();
if (exec_in_worker_thread) {
VLOG_QUERY << "Running in worker thread";
DCHECK(exec_state() == ExecState::PENDING);
UpdateNonErrorExecState(ExecState::RUNNING);
}
DebugActionNoFail(
- exec_request_->query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
+ exec_req.query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
TLoadDataResp response;
- Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
- if (exec_request_->load_data_request.iceberg_tbl) {
+ Status status = frontend_->LoadData(exec_req.load_data_request, &response);
+ if (exec_req.load_data_request.iceberg_tbl) {
ExecLoadIcebergDataRequestImpl(response);
}
{
@@ -832,11 +838,11 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
// The partition_name is an empty string for unpartitioned tables.
catalog_update.updated_partitions[response.partition_name] = updatedPartition;
- catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+ catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
catalog_update.__set_header(GetCatalogServiceRequestHeader());
- catalog_update.target_table = exec_request_->load_data_request.table_name.table_name;
- catalog_update.db_name = exec_request_->load_data_request.table_name.db_name;
- catalog_update.is_overwrite = exec_request_->load_data_request.overwrite;
+ catalog_update.target_table = exec_req.load_data_request.table_name.table_name;
+ catalog_update.db_name = exec_req.load_data_request.table_name.db_name;
+ catalog_update.is_overwrite = exec_req.load_data_request.overwrite;
CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status);
@@ -855,7 +861,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
status = parent_server_->ProcessCatalogUpdateResult(
resp.result,
- exec_request_->query_options.sync_ddl, query_options(), query_events_);
+ exec_req.query_options.sync_ddl, query_options(), query_events_);
{
lock_guard<mutex> l(lock_);
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
@@ -863,7 +869,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
}
void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) {
- TLoadDataReq load_data_req = exec_request_->load_data_request;
+ TLoadDataReq load_data_req = exec_request().load_data_request;
RuntimeProfile* child_profile =
RuntimeProfile::Create(&profile_pool_, "Child Queries");
profile_->AddChild(child_profile);
@@ -934,7 +940,7 @@ void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response)
Status ClientRequestState::ExecLoadDataRequest() {
- if (exec_request_->query_options.enable_async_load_data_execution) {
+ if (exec_request().query_options.enable_async_load_data_execution) {
// Transition the exec state out of INITIALIZED to PENDING to make available the
// runtime profile for the DDL.
UpdateNonErrorExecState(ExecState::PENDING);
@@ -948,7 +954,7 @@ Status ClientRequestState::ExecLoadDataRequest() {
}
Status ClientRequestState::ExecShutdownRequest() {
- const TShutdownParams& request = exec_request_->admin_request.shutdown_params;
+ const TShutdownParams& request = exec_request().admin_request.shutdown_params;
bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port;
// Use the local shutdown code path if the host is unspecified or if it exactly matches
@@ -1184,7 +1190,7 @@ void ClientRequestState::Wait() {
Status ClientRequestState::WaitInternal() {
// Explain requests have already populated the result set. Nothing to do here.
- if (exec_request_->stmt_type == TStmtType::EXPLAIN) {
+ if (exec_request().stmt_type == TStmtType::EXPLAIN) {
MarkInactive();
return Status::OK();
}
@@ -1509,22 +1515,23 @@ void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized)
}
Status ClientRequestState::UpdateCatalog() {
- if (!exec_request_->__isset.query_exec_request ||
- exec_request_->query_exec_request.stmt_type != TStmtType::DML) {
+ const TExecRequest& exec_req = exec_request();
+ if (!exec_req.__isset.query_exec_request ||
+ exec_req.query_exec_request.stmt_type != TStmtType::DML) {
return Status::OK();
}
query_events_->MarkEvent("DML data written");
SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
- const TQueryExecRequest& query_exec_request = exec_request_->query_exec_request;
+ const TQueryExecRequest& query_exec_request = exec_req.query_exec_request;
if (query_exec_request.__isset.finalize_params) {
const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
TUpdateCatalogRequest catalog_update;
- catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+ catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
catalog_update.__set_header(GetCatalogServiceRequestHeader());
- if (exec_request_->query_options.__isset.debug_action) {
- catalog_update.__set_debug_action(exec_request_->query_options.debug_action);
+ if (exec_req.query_options.__isset.debug_action) {
+ catalog_update.__set_debug_action(exec_req.query_options.debug_action);
}
DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
@@ -1587,7 +1594,7 @@ Status ClientRequestState::UpdateCatalog() {
query_events_->MarkEvent("Transaction committed");
}
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
- exec_request_->query_options.sync_ddl, query_options(), query_events_));
+ exec_req.query_options.sync_ddl, query_options(), query_events_));
}
} else if (InKuduTransaction()) {
// Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1768,9 +1775,10 @@ Status ClientRequestState::UpdateTableAndColumnStats(
col_stats_data = child_queries[1]->result_data();
}
+ const TExecRequest& exec_req = exec_request();
Status status = catalog_op_executor_->ExecComputeStats(
GetCatalogServiceRequestHeader(),
- exec_request_->catalog_op_request,
+ exec_req.catalog_op_request,
child_queries[0]->result_schema(),
child_queries[0]->result_data(),
col_stats_schema,
@@ -1781,7 +1789,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
}
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
- exec_request_->query_options.sync_ddl, query_options(), query_events_));
+ exec_req.query_options.sync_ddl, query_options(), query_events_));
// Set the results to be reported to the client.
SetResultSet(catalog_op_executor_->ddl_exec_response());
@@ -1909,11 +1917,11 @@ void ClientRequestState::UpdateEndTime() {
int64_t ClientRequestState::GetTransactionId() const {
DCHECK(InTransaction());
- return exec_request_->query_exec_request.finalize_params.transaction_id;
+ return exec_request().query_exec_request.finalize_params.transaction_id;
}
bool ClientRequestState::InTransaction() const {
- return exec_request_->query_exec_request.finalize_params.__isset.transaction_id &&
+ return exec_request().query_exec_request.finalize_params.__isset.transaction_id &&
!transaction_closed_;
}
@@ -1935,8 +1943,7 @@ void ClientRequestState::ClearTransactionState() {
bool ClientRequestState::InKuduTransaction() const {
// If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional
// is set as true by Frontend.doCreateExecRequest().
- DCHECK(exec_request_ != nullptr);
- return (exec_request_->query_exec_request.query_ctx.is_kudu_transactional
+ return (exec_request().query_exec_request.query_ctx.is_kudu_transactional
&& !transaction_closed_);
}
@@ -1955,7 +1962,7 @@ Status ClientRequestState::CommitKuduTransaction() {
DCHECK(InKuduTransaction());
// Skip calling Commit() for Kudu Transaction with a debug action so that test code
// could explicitly control over calling Commit().
- Status status = DebugAction(exec_request_->query_options, "CRS_NOT_COMMIT_KUDU_TXN");
+ Status status = DebugAction(exec_request().query_options, "CRS_NOT_COMMIT_KUDU_TXN");
if (UNLIKELY(!status.ok())) {
VLOG(1) << Substitute("Skip to commit Kudu transaction with query-id: $0",
PrintId(query_ctx_.query_id));
@@ -1988,6 +1995,7 @@ void ClientRequestState::LogQueryEvents() {
case TStmtType::QUERY:
case TStmtType::DML:
case TStmtType::DDL:
+ case TStmtType::UNKNOWN:
log_events = status.ok();
break;
case TStmtType::EXPLAIN:
@@ -2255,7 +2263,7 @@ Status ClientRequestState::ExecMigrateRequest() {
void ClientRequestState::ExecMigrateRequestImpl() {
// A convert table request holds the query strings for the sub-queries. These are
// populated by ConvertTableToIcebergStmt in the Frontend during analysis.
- TConvertTableRequest& params = exec_request_->convert_table_request;
+ const TConvertTableRequest& params = exec_request().convert_table_request;
{
RuntimeProfile* child_profile =
RuntimeProfile::Create(&profile_pool_, "Child Queries 1");
@@ -2295,7 +2303,7 @@ void ClientRequestState::ExecMigrateRequestImpl() {
}
}
// Create an external Iceberg table using the data of the HDFS table.
- Status status = frontend_->Convert(*exec_request_);
+ Status status = frontend_->Convert(exec_request());
if (!status.ok()) AddTableResetHints(params, &status);
{
lock_guard<mutex> l(lock_);
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 459fce310..a9f8c1912 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -78,8 +78,7 @@ class QuerySchedulePB;
class ClientRequestState {
public:
ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend, ImpalaServer* server,
- std::shared_ptr<ImpalaServer::SessionState> session, TExecRequest* exec_request,
- QueryDriver* query_driver);
+ std::shared_ptr<ImpalaServer::SessionState> session, QueryDriver* query_driver);
~ClientRequestState();
@@ -87,6 +86,12 @@ class ClientRequestState {
enum class RetryState { RETRYING, RETRIED, NOT_RETRIED };
+ /// Initialize TExecRequest. Prior to this, exec_request() returns a place-holder.
+ void SetExecRequest(const TExecRequest* exec_request) {
+ DCHECK(exec_request_.Load()->stmt_type == TStmtType::UNKNOWN);
+ exec_request_.Store(exec_request);
+ }
+
/// Sets the profile that is produced by the frontend. The frontend creates the
/// profile during planning and returns it to the backend via TExecRequest,
/// which then sets the frontend profile.
@@ -234,7 +239,7 @@ class ClientRequestState {
/// Queries are run and authorized on behalf of the effective_user.
const std::string& effective_user() const;
const std::string& connected_user() const { return query_ctx_.session.connected_user; }
- bool user_has_profile_access() const { return user_has_profile_access_; }
+ bool user_has_profile_access() const { return user_has_profile_access_.Load(); }
const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
TSessionType::type session_type() const { return query_ctx_.session.session_type; }
const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
@@ -253,14 +258,15 @@ class ClientRequestState {
/// control.
/// Admission control resource pool associated with this query.
std::string request_pool() const {
- if (is_planning_done_.load() && exec_request_ != nullptr
- && exec_request_->query_exec_request.query_ctx.__isset.request_pool) {
- // If the request pool has been set by Planner, return the request pool selected
- // by Planner.
- return exec_request_->query_exec_request.query_ctx.request_pool;
- } else {
- return query_ctx_.__isset.request_pool ? query_ctx_.request_pool : "";
+ if (is_planning_done_.load()) {
+ const TExecRequest& exec_req = exec_request();
+ if (exec_req.query_exec_request.query_ctx.__isset.request_pool) {
+ // If the request pool has been set by Planner, return the request pool selected
+ // by Planner.
+ return exec_req.query_exec_request.query_ctx.request_pool;
+ }
}
+ return query_ctx_.__isset.request_pool ? query_ctx_.request_pool : "";
}
int num_rows_fetched() const { return num_rows_fetched_; }
@@ -270,18 +276,18 @@ class ClientRequestState {
const TResultSetMetadata* result_metadata() const { return &result_metadata_; }
const TUniqueId& query_id() const { return query_ctx_.query_id; }
/// Returns the TExecRequest for the query associated with this ClientRequestState.
- /// Contents are only valid after InitExecRequest(TQueryCtx) initializes the
+ /// Contents are a place-holder until GetExecRequest(TQueryCtx) initializes the
/// TExecRequest.
const TExecRequest& exec_request() const {
- DCHECK(exec_request_ != nullptr);
- return *exec_request_;
+ DCHECK(exec_request_.Load() != nullptr);
+ return *exec_request_.Load();
}
- TStmtType::type stmt_type() const { return exec_request_->stmt_type; }
+ TStmtType::type stmt_type() const { return exec_request().stmt_type; }
TCatalogOpType::type catalog_op_type() const {
- return exec_request_->catalog_op_request.op_type;
+ return exec_request().catalog_op_request.op_type;
}
TDdlType::type ddl_type() const {
- return exec_request_->catalog_op_request.ddl_params.ddl_type;
+ return exec_request().catalog_op_request.ddl_params.ddl_type;
}
std::mutex* lock() { return &lock_; }
std::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
@@ -302,7 +308,7 @@ class ClientRequestState {
const Status& query_status() const { return query_status_; }
void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
void set_user_profile_access(bool user_has_profile_access) {
- user_has_profile_access_ = user_has_profile_access;
+ user_has_profile_access_.Store(user_has_profile_access);
}
const RuntimeProfile* profile() const { return profile_; }
const RuntimeProfile* summary_profile() const { return summary_profile_; }
@@ -316,7 +322,7 @@ class ClientRequestState {
TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
const std::vector<std::string>& GetAnalysisWarnings() const {
- return exec_request_->analysis_warnings;
+ return exec_request().analysis_warnings;
}
inline int64_t last_active_ms() const {
@@ -660,14 +666,16 @@ class ClientRequestState {
/// UpdateQueryStatus(Status) or MarkAsRetrying(Status).
Status query_status_;
+ /// Default TExecRequest to return until SetExecRequest is called.
+ static const TExecRequest unknown_exec_request_;
+
/// The TExecRequest for the query tracked by this ClientRequestState. The TExecRequest
- /// is initialized in QueryDriver::RunFrontendPlanner(TQueryCtx).The TExecRequest is
- /// owned by the parent QueryDriver.
- TExecRequest* exec_request_;
+ /// is initialized once it's finalized. It's owned by the parent QueryDriver.
+ AtomicPtr<const TExecRequest> exec_request_{&unknown_exec_request_};
/// If true, effective_user() has access to the runtime profile and execution
/// summary.
- bool user_has_profile_access_ = true;
+ AtomicBool user_has_profile_access_{true};
TResultSetMetadata result_metadata_; // metadata for select query
int num_rows_fetched_ = 0; // number of rows fetched by client for the entire query
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b17afb383..79de37006 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -1206,14 +1206,15 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
lock_guard<mutex> l(*(*query_handle).lock());
query_status = query_handle->query_status();
stmt = query_handle->sql_stmt();
- plan = query_handle->exec_request().query_exec_request.query_plan;
+ const TExecRequest& exec_request = query_handle->exec_request();
+ plan = exec_request.query_exec_request.query_plan;
if ((include_json_plan || include_summary)
&& query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(&summary);
}
if (include_json_plan) {
for (const TPlanExecInfo& plan_exec_info:
- query_handle->exec_request().query_exec_request.plan_exec_info) {
+ exec_request.query_exec_request.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
fragments.push_back(fragment);
}
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 21b778302..6217956fb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -349,11 +349,6 @@ DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
"QueryEventHook execution. If this number is >1 then hooks will execute "
"concurrently.");
-// Dumps used for debugging and diffing ExecRequests in text form.
-DEFINE_string(dump_exec_request_path, "",
- "If set, dump TExecRequest structures to {dump_exec_request_path}/"
- "TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
-
DECLARE_bool(compact_catalog_topic);
DEFINE_bool(use_local_tz_for_unix_timestamp_conversions, false,
@@ -1256,33 +1251,6 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
return status;
}
-void DumpTExecReq(const TExecRequest& exec_request, const char* dump_type,
- const TUniqueId& query_id) {
- if (FLAGS_dump_exec_request_path.empty()) return;
- int depth = 0;
- std::stringstream tmpstr;
- string fn(Substitute("$0/TExecRequest-$1.$2", FLAGS_dump_exec_request_path,
- dump_type, PrintId(query_id, "-")));
- std::ofstream ofs(fn);
- tmpstr << exec_request;
- std::string s = tmpstr.str();
- const char *p = s.c_str();
- const int len = s.length();
- for (int i = 0; i < len; ++i) {
- const char ch = p[i];
- ofs << ch;
- if (ch == '(') {
- depth++;
- } else if (ch == ')' && depth > 0) {
- depth--;
- } else if (ch == ',') {
- } else {
- continue;
- }
- ofs << '\n' << std::setw(depth) << " ";
- }
-}
-
Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state,
bool* registered_query, QueryHandle* query_handle) {
@@ -1329,17 +1297,6 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
statement_length, max_statement_length));
}
- Status exec_status = Status::OK();
- TUniqueId query_id = (*query_handle)->query_id();
- // Generate TExecRequest here if one was not passed in or we want one
- // from the Impala planner to compare with
- if (!is_external_req || !FLAGS_dump_exec_request_path.empty()) {
- // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest
- // for this query.
- RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
- DumpTExecReq((*query_handle)->exec_request(), "internal", query_id);
- }
-
if (is_external_req) {
// Use passed in exec_request
RETURN_IF_ERROR(query_handle->query_driver()->SetExternalPlan(
@@ -1349,8 +1306,9 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
RETURN_IF_ERROR(exec_env_->frontend()->addTransaction(
external_exec_request->query_exec_request.query_ctx));
}
- exec_status = Status::OK();
- DumpTExecReq((*query_handle)->exec_request(), "external", query_id);
+ } else {
+ // Generate TExecRequest here if one was not passed in
+ RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
}
const TExecRequest& result = (*query_handle)->exec_request();
@@ -2531,12 +2489,10 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& query
void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle) {
id = query_handle.query_id();
- const TExecRequest& request = query_handle.exec_request();
const string* plan_str = query_handle.summary_profile()->GetInfoString("Plan");
if (plan_str != nullptr) plan = *plan_str;
stmt = query_handle.sql_stmt();
- stmt_type = request.stmt_type;
effective_user = query_handle.effective_user();
default_db = query_handle.default_db();
start_time_us = query_handle.start_time_us();
@@ -2579,9 +2535,10 @@ void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle
query_handle.query_events()->ToThrift(&event_sequence);
+ const TExecRequest& request = query_handle.exec_request();
+ stmt_type = request.stmt_type;
// Save the query fragments so that the plan can be visualised.
- for (const TPlanExecInfo& plan_exec_info:
- query_handle.exec_request().query_exec_request.plan_exec_info) {
+ for (const TPlanExecInfo& plan_exec_info: request.query_exec_request.plan_exec_info) {
fragments.insert(fragments.end(),
plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
}
diff --git a/bin/tsan-suppressions.txt b/bin/tsan-suppressions.txt
index ab4a30426..580cc1a72 100644
--- a/bin/tsan-suppressions.txt
+++ b/bin/tsan-suppressions.txt
@@ -45,12 +45,5 @@ race:impala::RpcMgrKerberizedTest
# TODO: IMPALA-9403: Allow TSAN to be set on codegen
race:impala::HdfsColumnarScanner::ProcessScratchBatchCodegenOrInterpret
-# This race condition exists when the Impala UI is refreshed while the frontend planner
-# is updating the TExecRequest in the QueryDriver or the ClientRequestState since it holds
-# a pointer. The implications are the UI displays slightly outdated data until the next
-# refresh. Correcting this condition would require holding a lock during the entire
-# planning phase.
-# The SimultaneousMultipleQueriesOneSession test within internal-server-test.cc surfaced
-# this data race. Since this test runs multiple threads, it is not the test function
-# itself but rather the threads it launches that cause the data race .
-race:impala::internalservertest::runTestQueries
+# TODO: IMPALA-12757: TSAN flags lock-order-inversion during internal-server-test
+deadlock:impala::KrpcDataStreamSender::Channel::WaitForRpcLocked
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f19f532c2..d564fd631 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -617,7 +617,7 @@ struct TConvertTableRequest {
// Result of call to createExecRequest()
struct TExecRequest {
- 1: required Types.TStmtType stmt_type
+ 1: required Types.TStmtType stmt_type = TStmtType.UNKNOWN
// Copied from the corresponding TClientRequest
2: required Query.TQueryOptions query_options
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index aaecb0602..f36ae556f 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -110,6 +110,7 @@ enum TStmtType {
ADMIN_FN = 6
TESTCASE = 7
CONVERT = 8
+ UNKNOWN = 9
}
enum TIcebergOperation {