You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/15 02:02:33 UTC

[impala] 01/02: IMPALA-10039 (part 2): Fixed Expr-test crash due to race condition

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0681615c2f8f38763437dc9440e8eda42334f5c
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Aug 7 22:52:43 2020 -0700

    IMPALA-10039 (part 2): Fixed Expr-test crash due to race condition
    
    The root cause for crash is that QueryState::Cancel() was called
    before thread unsafe function QueryState::Init() was completed.
    This patch fixs the race condition between QueryState::Cancel()
    and QueryState::Init(). QueryState::Init() is safe to be called
    at any time.
    
    Testing:
     - The issue could be reproduced by running expr-test for 10-20
       iterations. Verified the fixing by running expr-test over 1000
       iterations without crash.
     - Passed TestProcessFailures::test_kill_coordinator.
     - Passed core tests.
    
    Change-Id: Ib0d3b9c59924a25b70fa20afeb6e8ca93016eca9
    Reviewed-on: http://gerrit.cloudera.org:8080/16313
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
---
 be/src/runtime/query-state.cc | 23 ++++++++++++++++++-----
 be/src/runtime/query-state.h  | 16 ++++++++++++++--
 2 files changed, 32 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3ad9ee0..ce61f99 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -147,11 +147,14 @@ QueryState::~QueryState() {
 
 Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
     const TExecPlanFragmentInfo& fragment_info) {
+  std::lock_guard<std::mutex> l(init_lock_);
   // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
   // Init() on failure. We need to do this before any returns because Init() always
   // returns a resource refcount to its caller.
   AcquireBackendResourceRefcount();
 
+  if (IsCancelled()) return Status::CANCELLED;
+
   RETURN_IF_ERROR(DebugAction(query_options(), "QUERY_STATE_INIT"));
 
   ExecEnv* exec_env = ExecEnv::GetInstance();
@@ -225,11 +228,6 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
       non_const_fragment_info.fragment_instance_ctxs);
   fragment_info_.__isset.fragment_instance_ctxs = true;
 
-  instances_prepared_barrier_.reset(
-      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
-  instances_finished_barrier_.reset(
-      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
-
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
   initial_reservations_ =
@@ -239,6 +237,14 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
       query_id(), exec_rpc_params->min_mem_reservation_bytes()));
   RETURN_IF_ERROR(InitFilterBank());
   scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter);
+
+  // Set barriers only for successful initialization. Otherwise the barriers
+  // never be notified.
+  instances_prepared_barrier_.reset(
+      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
+  instances_finished_barrier_.reset(
+      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
+  is_initialized_ = true;
   return Status::OK();
 }
 
@@ -847,6 +853,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
 
 void QueryState::Cancel() {
   VLOG_QUERY << "Cancel: query_id=" << PrintId(query_id());
+  {
+    std::lock_guard<std::mutex> l(init_lock_);
+    if (!is_initialized_) {
+      discard_result(is_cancelled_.CompareAndSwap(0, 1));
+      return;
+    }
+  }
   discard_result(WaitForPrepare());
   if (!is_cancelled_.CompareAndSwap(0, 1)) return;
   if (filter_bank_ != nullptr) filter_bank_->Cancel();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 575eb55..5f45eff 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -109,7 +109,10 @@ class TRuntimeProfileForest;
 /// indicator). If execution ended with an error, that error status will be part of
 /// the final report (it will not be overridden by the resulting cancellation).
 ///
-/// Thread-safe, unless noted otherwise.
+/// Thread-safe Notes:
+/// - Init() must be called first. After Init() returns successfully, all other public
+///   functions are thread-safe to be called unless noted otherwise.
+/// - Cancel() is safe to be called at any time (before or during Init()).
 ///
 /// TODO:
 /// - set up kudu clients in Init(), remove related locking
@@ -186,7 +189,7 @@ class QueryState {
   /// it to the caller on both success and failure. The caller must release it by
   /// calling ReleaseBackendResourceRefcount().
   ///
-  /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
+  /// Uses few cycles and blocks Cancel() to execute. Not idempotent.
   /// The remaining public functions must be called only after Init().
   Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
       const TExecPlanFragmentInfo& fragment_info) WARN_UNUSED_RESULT;
@@ -216,6 +219,8 @@ class QueryState {
 
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.
+  /// For uninitialized QueryState, just set is_cancelled_ and don't need to cancel
+  /// fragment instances.
   void Cancel();
 
   /// Return true if the executing fragment instances have been cancelled.
@@ -402,6 +407,13 @@ class QueryState {
   ObjectPool obj_pool_;
   AtomicInt32 refcnt_;
 
+  /// Protects 'is_initialized_'.
+  std::mutex init_lock_;
+
+  /// Set as true on successful initialization.
+  /// Protected by 'init_lock_'.
+  bool is_initialized_ = false;
+
   /// set to 1 when any fragment instance fails or when Cancel() is called; used to
   /// initiate cancellation exactly once
   AtomicInt32 is_cancelled_;