You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/15 02:02:33 UTC
[impala] 01/02: IMPALA-10039 (part 2): Fixed Expr-test crash due to
race condition
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e0681615c2f8f38763437dc9440e8eda42334f5c
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Aug 7 22:52:43 2020 -0700
IMPALA-10039 (part 2): Fixed Expr-test crash due to race condition
The root cause for crash is that QueryState::Cancel() was called
before thread unsafe function QueryState::Init() was completed.
This patch fixs the race condition between QueryState::Cancel()
and QueryState::Init(). QueryState::Init() is safe to be called
at any time.
Testing:
- The issue could be reproduced by running expr-test for 10-20
iterations. Verified the fixing by running expr-test over 1000
iterations without crash.
- Passed TestProcessFailures::test_kill_coordinator.
- Passed core tests.
Change-Id: Ib0d3b9c59924a25b70fa20afeb6e8ca93016eca9
Reviewed-on: http://gerrit.cloudera.org:8080/16313
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
---
be/src/runtime/query-state.cc | 23 ++++++++++++++++++-----
be/src/runtime/query-state.h | 16 ++++++++++++++--
2 files changed, 32 insertions(+), 7 deletions(-)
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3ad9ee0..ce61f99 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -147,11 +147,14 @@ QueryState::~QueryState() {
Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
const TExecPlanFragmentInfo& fragment_info) {
+ std::lock_guard<std::mutex> l(init_lock_);
// Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
// Init() on failure. We need to do this before any returns because Init() always
// returns a resource refcount to its caller.
AcquireBackendResourceRefcount();
+ if (IsCancelled()) return Status::CANCELLED;
+
RETURN_IF_ERROR(DebugAction(query_options(), "QUERY_STATE_INIT"));
ExecEnv* exec_env = ExecEnv::GetInstance();
@@ -225,11 +228,6 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
non_const_fragment_info.fragment_instance_ctxs);
fragment_info_.__isset.fragment_instance_ctxs = true;
- instances_prepared_barrier_.reset(
- new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
- instances_finished_barrier_.reset(
- new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
-
// Claim the query-wide minimum reservation. Do this last so that we don't need
// to handle releasing it if a later step fails.
initial_reservations_ =
@@ -239,6 +237,14 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
query_id(), exec_rpc_params->min_mem_reservation_bytes()));
RETURN_IF_ERROR(InitFilterBank());
scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter);
+
+ // Set barriers only for successful initialization. Otherwise the barriers
+ // never be notified.
+ instances_prepared_barrier_.reset(
+ new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
+ instances_finished_barrier_.reset(
+ new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
+ is_initialized_ = true;
return Status::OK();
}
@@ -847,6 +853,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
void QueryState::Cancel() {
VLOG_QUERY << "Cancel: query_id=" << PrintId(query_id());
+ {
+ std::lock_guard<std::mutex> l(init_lock_);
+ if (!is_initialized_) {
+ discard_result(is_cancelled_.CompareAndSwap(0, 1));
+ return;
+ }
+ }
discard_result(WaitForPrepare());
if (!is_cancelled_.CompareAndSwap(0, 1)) return;
if (filter_bank_ != nullptr) filter_bank_->Cancel();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 575eb55..5f45eff 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -109,7 +109,10 @@ class TRuntimeProfileForest;
/// indicator). If execution ended with an error, that error status will be part of
/// the final report (it will not be overridden by the resulting cancellation).
///
-/// Thread-safe, unless noted otherwise.
+/// Thread-safe Notes:
+/// - Init() must be called first. After Init() returns successfully, all other public
+/// functions are thread-safe to be called unless noted otherwise.
+/// - Cancel() is safe to be called at any time (before or during Init()).
///
/// TODO:
/// - set up kudu clients in Init(), remove related locking
@@ -186,7 +189,7 @@ class QueryState {
/// it to the caller on both success and failure. The caller must release it by
/// calling ReleaseBackendResourceRefcount().
///
- /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
+ /// Uses few cycles and blocks Cancel() to execute. Not idempotent.
/// The remaining public functions must be called only after Init().
Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
const TExecPlanFragmentInfo& fragment_info) WARN_UNUSED_RESULT;
@@ -216,6 +219,8 @@ class QueryState {
/// Cancels all actively executing fragment instances. Blocks until all fragment
/// instances have finished their Prepare phase. Idempotent.
+ /// For uninitialized QueryState, just set is_cancelled_ and don't need to cancel
+ /// fragment instances.
void Cancel();
/// Return true if the executing fragment instances have been cancelled.
@@ -402,6 +407,13 @@ class QueryState {
ObjectPool obj_pool_;
AtomicInt32 refcnt_;
+ /// Protects 'is_initialized_'.
+ std::mutex init_lock_;
+
+ /// Set as true on successful initialization.
+ /// Protected by 'init_lock_'.
+ bool is_initialized_ = false;
+
/// set to 1 when any fragment instance fails or when Cancel() is called; used to
/// initiate cancellation exactly once
AtomicInt32 is_cancelled_;