You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/04/08 03:40:25 UTC

[impala] 01/02: IMPALA-9611: fix hang when cancelling join builder

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7fd046d00f9cd3106ff2c617c53ab6d183920785
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Apr 5 12:44:53 2020 -0700

    IMPALA-9611: fix hang when cancelling join builder
    
    The error could occur in the following scenario, where
    thread A is executing a join build fragment and thread
    B is cancelling the fragment instance.
    1. Thread A is in HandoffToProbesAndWait(), reads is_cancelled_
      and sees false.
    2. Thread B in RuntimeState::Cancel() sets is_cancelled_ = true,
      acquires cancellation_cvs_lock_, then calls NotifyAll() on the
      condition variable
    3. Thread A calls Wait() on the condition variable, blocks forever
      because cancellation already happened.
    
    The fix is for thread B to acquire the lock that thread A is
    holding. That prevents the race because #1 and #3 above are in the
    same critical section and thread B won't be able to signal the
    condition variable until thread A has released it.
    
    Testing:
    Added metric check to test_failpoints to make it easier to detect
    hangs caused by those tests in future.
    
    Looped test_failpoints.py overnight, which was previously enough
    to reproduce the failure within a couple of hours.
    
    Ran exhaustive tests.
    
    Change-Id: I996ad2055d6542eb57e12c663b89de5f84208f77
    Reviewed-on: http://gerrit.cloudera.org:8080/15672
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/join-builder.cc      |  4 ++--
 be/src/runtime/runtime-state.cc  | 20 +++++++++++++++-----
 be/src/runtime/runtime-state.h   | 21 ++++++++++++++++-----
 tests/failure/test_failpoints.py |  7 +++++++
 4 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index 880dbd1..e98b084 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -67,7 +67,7 @@ void JoinBuilder::CloseFromProbe(RuntimeState* join_node_state) {
 
 Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) {
   DCHECK(is_separate_build_);
-  join_node_state->AddCancellationCV(&probe_wakeup_cv_);
+  join_node_state->AddCancellationCV(&separate_build_lock_, &probe_wakeup_cv_);
   VLOG(2) << "JoinBuilder (id=" << join_node_id_ << ")"
           << " WaitForInitialBuild() called by finstance "
           << PrintId(join_node_state->fragment_instance_id());
@@ -97,7 +97,7 @@ Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) {
 void JoinBuilder::HandoffToProbesAndWait(RuntimeState* build_side_state) {
   DCHECK(is_separate_build_) << "Doesn't make sense for embedded builder.";
   VLOG(2) << "Initial build ready JoinBuilder (id=" << join_node_id_ << ")";
-  build_side_state->AddCancellationCV(&build_wakeup_cv_);
+  build_side_state->AddCancellationCV(&separate_build_lock_, &build_wakeup_cv_);
   {
     unique_lock<mutex> l(separate_build_lock_);
     ready_to_probe_ = true;
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 4cb68e3..d314906 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -229,20 +229,30 @@ void RuntimeState::Cancel() {
   is_cancelled_.Store(true);
   {
     lock_guard<SpinLock> l(cancellation_cvs_lock_);
-    for (ConditionVariable* cv : cancellation_cvs_) cv->NotifyAll();
+    for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) {
+      // Acquire the lock to prevent races between readers of 'is_cancelled_' and this
+      // writing thread (e.g. IMPALA-9611) - the caller should read 'is_cancelled_' while
+      // holding the lock. Drop it before signalling the CV so that a blocked thread can
+      // immediately acquire the mutex when it wakes up.
+      {
+        lock_guard<mutex> l(*entry.first);
+      }
+      entry.second->NotifyAll();
+    }
     for (CyclicBarrier* cb : cancellation_cbs_) {
       cb->Cancel(Status::CancelledInternal("RuntimeState::Cancel()"));
     }
   }
+
 }
 
-void RuntimeState::AddCancellationCV(ConditionVariable* cv) {
+void RuntimeState::AddCancellationCV(mutex* mutex, ConditionVariable* cv) {
   lock_guard<SpinLock> l(cancellation_cvs_lock_);
-  for (ConditionVariable* cv2 : cancellation_cvs_) {
+  for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) {
     // Don't add if already present.
-    if (cv == cv2) return;
+    if (mutex == entry.first && cv == entry.second) return;
   }
-  cancellation_cvs_.push_back(cv);
+  cancellation_cvs_.push_back(make_pair(mutex, cv));
 }
 
 void RuntimeState::AddBarrierToCancel(CyclicBarrier* cb) {
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 102db23..91e500e 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -19,10 +19,12 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
 #define IMPALA_RUNTIME_RUNTIME_STATE_H
 
-#include <boost/scoped_ptr.hpp>
+#include <mutex>
+#include <string>
 #include <utility>
 #include <vector>
-#include <string>
+
+#include <boost/scoped_ptr.hpp>
 
 // NOTE: try not to add more headers here: runtime-state.h is included in many many files.
 #include "common/global-types.h"  // for PlanNodeId
@@ -187,12 +189,21 @@ class RuntimeState {
   Status LogOrReturnError(const ErrorMsg& message);
 
   bool is_cancelled() const { return is_cancelled_.Load(); }
+
+  /// Cancel this runtime state, signalling all condition variables and cancelling all
+  /// barriers added in AddCancellationCV() and AddBarrierToCancel(). This function will
+  /// acquire mutexes added in AddCancellationCV(), so the caller must not hold any locks
+  /// that must acquire after those mutexes in the lock order.
   void Cancel();
+
   /// Add a condition variable to be signalled when this RuntimeState is cancelled.
   /// Adding a condition variable multiple times is a no-op. Each distinct 'cv' will be
-  /// signalled once with NotifyAll() when is_cancelled() becomes true.
+  /// signalled once with NotifyAll() when is_cancelled() becomes true. 'mutex' will
+  /// be acquired by the cancelling thread after is_cancelled() becomes true. The caller
+  /// must hold 'mutex' when checking is_cancelled() to avoid a race like IMPALA-9611
+  /// where the notification on 'cv' is lost.
   /// The condition variable must have query lifetime.
-  void AddCancellationCV(ConditionVariable* cv);
+  void AddCancellationCV(std::mutex* mutex, ConditionVariable* cv);
 
   /// Add a barrier to be cancelled when this RuntimeState is cancelled. Adding a barrier
   /// multiple times is a no-op. Each distinct 'cb' will be cancelled with status code
@@ -371,7 +382,7 @@ class RuntimeState {
 
   /// Condition variables that will be signalled by Cancel(). Protected by
   /// 'cancellation_cvs_lock_'.
-  std::vector<ConditionVariable*> cancellation_cvs_;
+  std::vector<std::pair<std::mutex*, ConditionVariable*>> cancellation_cvs_;
 
   /// Cyclic barriers that will be signalled by Cancel(). Protected by
   /// 'cancellation_cvs_lock_'.
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index c3711aa..033b485 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -25,11 +25,13 @@ from collections import defaultdict
 from time import sleep
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
     SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
+from tests.verifiers.metric_verifier import MetricVerifier
 
 FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
 # Not included:
@@ -125,6 +127,11 @@ class TestFailpoints(ImpalaTestSuite):
     del vector.get_value('exec_option')['debug_action']
     self.execute_query(query, vector.get_value('exec_option'))
 
+    # Detect any hung fragments left from this test.
+    for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+
   def __parse_plan_nodes_from_explain(self, query, vector):
     """Parses the EXPLAIN <query> output and returns a list of node ids.
     Expects format of <ID>:<NAME>"""