You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/04/08 03:40:24 UTC

[impala] branch master updated (dbd2236 -> a2e9409)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from dbd2236  IMPALA-9545 Decide cacheline size of aarch64
     new 7fd046d  IMPALA-9611: fix hang when cancelling join builder
     new a2e9409  IMPALA-9565 Remove unused included file mm_malloc.h on ARM

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/join-builder.cc       |  4 ++--
 be/src/kudu/util/memory/memory.cc |  3 +++
 be/src/kudu/util/striped64.cc     |  4 ++++
 be/src/runtime/runtime-state.cc   | 20 +++++++++++++++-----
 be/src/runtime/runtime-state.h    | 21 ++++++++++++++++-----
 tests/failure/test_failpoints.py  |  7 +++++++
 6 files changed, 47 insertions(+), 12 deletions(-)


[impala] 01/02: IMPALA-9611: fix hang when cancelling join builder

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7fd046d00f9cd3106ff2c617c53ab6d183920785
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Apr 5 12:44:53 2020 -0700

    IMPALA-9611: fix hang when cancelling join builder
    
    The error could occur in the following scenario, where
    thread A is executing a join build fragment and thread
    B is cancelling the fragment instance.
    1. Thread A is in HandoffToProbesAndWait(), reads is_cancelled_
      and sees false.
    2. Thread B in RuntimeState::Cancel() sets is_cancelled_ = true,
      acquires cancellation_cvs_lock_, then calls NotifyAll() on the
      condition variable
    3. Thread A calls Wait() on the condition variable, blocks forever
      because cancellation already happened.
    
    The fix is for thread B to acquire the lock that thread A is
    holding. That prevents the race because #1 and #3 above are in the
    same critical section and thread B won't be able to signal the
    condition variable until thread A has released it.
    
    Testing:
    Added metric check to test_failpoints to make it easier to detect
    hangs caused by those tests in future.
    
    Looped test_failpoints.py overnight, which was previously enough
    to reproduce the failure within a couple of hours.
    
    Ran exhaustive tests.
    
    Change-Id: I996ad2055d6542eb57e12c663b89de5f84208f77
    Reviewed-on: http://gerrit.cloudera.org:8080/15672
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/join-builder.cc      |  4 ++--
 be/src/runtime/runtime-state.cc  | 20 +++++++++++++++-----
 be/src/runtime/runtime-state.h   | 21 ++++++++++++++++-----
 tests/failure/test_failpoints.py |  7 +++++++
 4 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index 880dbd1..e98b084 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -67,7 +67,7 @@ void JoinBuilder::CloseFromProbe(RuntimeState* join_node_state) {
 
 Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) {
   DCHECK(is_separate_build_);
-  join_node_state->AddCancellationCV(&probe_wakeup_cv_);
+  join_node_state->AddCancellationCV(&separate_build_lock_, &probe_wakeup_cv_);
   VLOG(2) << "JoinBuilder (id=" << join_node_id_ << ")"
           << " WaitForInitialBuild() called by finstance "
           << PrintId(join_node_state->fragment_instance_id());
@@ -97,7 +97,7 @@ Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) {
 void JoinBuilder::HandoffToProbesAndWait(RuntimeState* build_side_state) {
   DCHECK(is_separate_build_) << "Doesn't make sense for embedded builder.";
   VLOG(2) << "Initial build ready JoinBuilder (id=" << join_node_id_ << ")";
-  build_side_state->AddCancellationCV(&build_wakeup_cv_);
+  build_side_state->AddCancellationCV(&separate_build_lock_, &build_wakeup_cv_);
   {
     unique_lock<mutex> l(separate_build_lock_);
     ready_to_probe_ = true;
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 4cb68e3..d314906 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -229,20 +229,30 @@ void RuntimeState::Cancel() {
   is_cancelled_.Store(true);
   {
     lock_guard<SpinLock> l(cancellation_cvs_lock_);
-    for (ConditionVariable* cv : cancellation_cvs_) cv->NotifyAll();
+    for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) {
+      // Acquire the lock to prevent races between readers of 'is_cancelled_' and this
+      // writing thread (e.g. IMPALA-9611) - the caller should read 'is_cancelled_' while
+      // holding the lock. Drop it before signalling the CV so that a blocked thread can
+      // immediately acquire the mutex when it wakes up.
+      {
+        lock_guard<mutex> l(*entry.first);
+      }
+      entry.second->NotifyAll();
+    }
     for (CyclicBarrier* cb : cancellation_cbs_) {
       cb->Cancel(Status::CancelledInternal("RuntimeState::Cancel()"));
     }
   }
+
 }
 
-void RuntimeState::AddCancellationCV(ConditionVariable* cv) {
+void RuntimeState::AddCancellationCV(mutex* mutex, ConditionVariable* cv) {
   lock_guard<SpinLock> l(cancellation_cvs_lock_);
-  for (ConditionVariable* cv2 : cancellation_cvs_) {
+  for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) {
     // Don't add if already present.
-    if (cv == cv2) return;
+    if (mutex == entry.first && cv == entry.second) return;
   }
-  cancellation_cvs_.push_back(cv);
+  cancellation_cvs_.push_back(make_pair(mutex, cv));
 }
 
 void RuntimeState::AddBarrierToCancel(CyclicBarrier* cb) {
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 102db23..91e500e 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -19,10 +19,12 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
 #define IMPALA_RUNTIME_RUNTIME_STATE_H
 
-#include <boost/scoped_ptr.hpp>
+#include <mutex>
+#include <string>
 #include <utility>
 #include <vector>
-#include <string>
+
+#include <boost/scoped_ptr.hpp>
 
 // NOTE: try not to add more headers here: runtime-state.h is included in many many files.
 #include "common/global-types.h"  // for PlanNodeId
@@ -187,12 +189,21 @@ class RuntimeState {
   Status LogOrReturnError(const ErrorMsg& message);
 
   bool is_cancelled() const { return is_cancelled_.Load(); }
+
+  /// Cancel this runtime state, signalling all condition variables and cancelling all
+  /// barriers added in AddCancellationCV() and AddBarrierToCancel(). This function will
+  /// acquire mutexes added in AddCancellationCV(), so the caller must not hold any locks
+  /// that must acquire after those mutexes in the lock order.
   void Cancel();
+
   /// Add a condition variable to be signalled when this RuntimeState is cancelled.
   /// Adding a condition variable multiple times is a no-op. Each distinct 'cv' will be
-  /// signalled once with NotifyAll() when is_cancelled() becomes true.
+  /// signalled once with NotifyAll() when is_cancelled() becomes true. 'mutex' will
+  /// be acquired by the cancelling thread after is_cancelled() becomes true. The caller
+  /// must hold 'mutex' when checking is_cancelled() to avoid a race like IMPALA-9611
+  /// where the notification on 'cv' is lost.
   /// The condition variable must have query lifetime.
-  void AddCancellationCV(ConditionVariable* cv);
+  void AddCancellationCV(std::mutex* mutex, ConditionVariable* cv);
 
   /// Add a barrier to be cancelled when this RuntimeState is cancelled. Adding a barrier
   /// multiple times is a no-op. Each distinct 'cb' will be cancelled with status code
@@ -371,7 +382,7 @@ class RuntimeState {
 
   /// Condition variables that will be signalled by Cancel(). Protected by
   /// 'cancellation_cvs_lock_'.
-  std::vector<ConditionVariable*> cancellation_cvs_;
+  std::vector<std::pair<std::mutex*, ConditionVariable*>> cancellation_cvs_;
 
   /// Cyclic barriers that will be signalled by Cancel(). Protected by
   /// 'cancellation_cvs_lock_'.
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index c3711aa..033b485 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -25,11 +25,13 @@ from collections import defaultdict
 from time import sleep
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
     SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
+from tests.verifiers.metric_verifier import MetricVerifier
 
 FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
 # Not included:
@@ -125,6 +127,11 @@ class TestFailpoints(ImpalaTestSuite):
     del vector.get_value('exec_option')['debug_action']
     self.execute_query(query, vector.get_value('exec_option'))
 
+    # Detect any hung fragments left from this test.
+    for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+
   def __parse_plan_nodes_from_explain(self, query, vector):
     """Parses the EXPLAIN <query> output and returns a list of node ids.
     Expects format of <ID>:<NAME>"""


[impala] 02/02: IMPALA-9565 Remove unused included file mm_malloc.h on ARM

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a2e94095a8f8df9432eec857aa6798a61bb2362c
Author: zhaorenhai <zh...@hotmail.com>
AuthorDate: Sat Mar 28 04:35:28 2020 +0000

    IMPALA-9565 Remove unused included file mm_malloc.h on ARM
    
    Remove unused included file mm_malloc.h on ARM.
    ARM version gcc don't have mm_malloc.h file.
    
    Change-Id: I4ea1f654b4da0e658843fad1c1f6de99b784dcff
    Reviewed-on: http://gerrit.cloudera.org:8080/15586
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/kudu/util/memory/memory.cc | 3 +++
 be/src/kudu/util/striped64.cc     | 4 ++++
 2 files changed, 7 insertions(+)

diff --git a/be/src/kudu/util/memory/memory.cc b/be/src/kudu/util/memory/memory.cc
index b3964df..4555fe6 100644
--- a/be/src/kudu/util/memory/memory.cc
+++ b/be/src/kudu/util/memory/memory.cc
@@ -20,7 +20,10 @@
 
 #include "kudu/util/memory/memory.h"
 
+// Aarch64 version gcc doesn't have mm_malloc.h.
+#ifndef __aarch64__
 #include <mm_malloc.h>
+#endif
 
 #include <algorithm>
 #include <cstdlib>
diff --git a/be/src/kudu/util/striped64.cc b/be/src/kudu/util/striped64.cc
index 789a395..4482261 100644
--- a/be/src/kudu/util/striped64.cc
+++ b/be/src/kudu/util/striped64.cc
@@ -17,7 +17,11 @@
 
 #include "kudu/util/striped64.h"
 
+// Aarch64 version gcc doesn't have mm_malloc.h.
+#ifndef __aarch64__
 #include <mm_malloc.h>
+#endif
+
 #include <unistd.h>
 
 #include <cstdlib>