You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/21 08:22:36 UTC

[impala] 01/03: IMPALA-12233: Fixed PHJ hanging caused by cyclic barrier

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 49d599c7f737388db71f531091d285f03ae1f2f4
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Thu Jul 13 16:56:35 2023 +0200

    IMPALA-12233: Fixed PHJ hanging caused by cyclic barrier
    
    Partitioned Hash Join with a limit could hang when using mt_dop>0, due
    to the cyclic barrier in PHJBuilder is not cancelled properly. Added
    possibility to unregister threads from the synchronization and a call
    to it to PHJNode::Close(), so closing threads won't block still
    processing ones.
    
    Testing:
      - Added new unit tests covering new feature
      - Added e2e test to make sure the join does not hang
    
    Change-Id: I8be75c7ce99c015964c8bbb547539e6619ba4f9b
    Reviewed-on: http://gerrit.cloudera.org:8080/20179
    Reviewed-by: Michael Smith <mi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/partitioned-hash-join-builder.cc       |   4 +
 be/src/exec/partitioned-hash-join-builder.h        |   3 +
 be/src/exec/partitioned-hash-join-node.cc          |   5 +-
 be/src/util/cyclic-barrier-test.cc                 | 100 +++++++++++++++++++++
 be/src/util/cyclic-barrier.cc                      |  12 +++
 be/src/util/cyclic-barrier.h                       |  17 ++--
 .../queries/QueryTest/joins_mt_dop.test            |  10 +++
 7 files changed, 145 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 1204e3ed2..067214653 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -1383,6 +1383,10 @@ string PhjBuilder::DebugString() const {
   ss << " buffer_pool_client=" << buffer_pool_client_->DebugString();
   return ss.str();
 }
+void PhjBuilder::UnregisterThreadFromBarrier() const {
+  DCHECK(probe_barrier_ != nullptr);
+  probe_barrier_->Unregister();
+}
 
 Status PhjBuilderConfig::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
     llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 70fa4491f..dd578e318 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -541,6 +541,9 @@ class PhjBuilder : public JoinBuilder {
 
   std::string DebugString() const;
 
+  /// Unregisters one probe thread from the barrier
+  void UnregisterThreadFromBarrier() const;
+
   /// Computes the minimum reservation required to execute the spilling partitioned
   /// hash algorithm successfully for any input size (assuming enough disk space is
   /// available for spilled rows). This includes buffers used by the build side,
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index c62dfd3f0..b3e594f1d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -24,8 +24,8 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/blocking-join-node.inline.h"
-#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hash-table.inline.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
@@ -35,6 +35,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "util/cyclic-barrier.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -309,6 +310,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
       builder_->CloseFromProbe(state);
       waited_for_build_ = false;
     }
+
+    if (builder_->num_probe_threads() > 1) builder_->UnregisterThreadFromBarrier();
   }
   ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
   if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll();
diff --git a/be/src/util/cyclic-barrier-test.cc b/be/src/util/cyclic-barrier-test.cc
index cafacd4d0..033a4553f 100644
--- a/be/src/util/cyclic-barrier-test.cc
+++ b/be/src/util/cyclic-barrier-test.cc
@@ -155,6 +155,106 @@ TEST(CyclicBarrierTest, Cancellation) {
   IMPALA_ASSERT_DEBUG_DEATH(barrier.Cancel(Status::OK()), "");
 }
 
+// Test that unregister functions as expected when the last call is an Unregister
+TEST(CyclicBarrierTest, UnregisterLast) {
+  const int NUM_THREADS = 8;
+  const int NUM_OF_UNREGISTERING_THREADS = 6;
+  int counter = 0;
+  AtomicInt32 waits_complete{0};
+  AtomicInt32 unregisters_complete{0};
+  CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS);
+  thread_group threads;
+  thread_group unregistering_threads;
+  // All threads should join the barrier, waiting.
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+      Status status = barrier.Wait([&counter]() {
+        ++counter;
+        return Status::OK();
+      });
+      EXPECT_TRUE(status.ok());
+      waits_complete.Add(1);
+    }));
+  }
+  SleepForMs(10); // Give other threads a chance to start before unregistering
+  EXPECT_EQ(0, counter) << "The callback should not have run.";
+
+  // All threads but one unregisters, the others are still waiting in the barrier
+  for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS - 1; ++i) {
+    unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() {
+      barrier.Unregister();
+      unregisters_complete.Add(1);
+    }));
+  }
+  unregistering_threads.join_all();
+
+  EXPECT_EQ(0, counter) << "The callback should not have run.";
+  EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned.";
+  EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS - 1, unregisters_complete.Load())
+      << "Unregisters should have returned.";
+
+  barrier.Unregister();
+
+  threads.join_all();
+
+  EXPECT_EQ(1, counter) << "Counter should have been incremented by the woken up thread "
+                           "after the last Unregister.";
+  EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned.";
+}
+
+// Test that unregister functions as expected when the last call is a Wait.
+TEST(CyclicBarrierTest, UnregisterNotLast) {
+  const int NUM_THREADS = 8;
+  const int NUM_OF_UNREGISTERING_THREADS = 6;
+  int counter = 0;
+  AtomicInt32 waits_complete{0};
+  AtomicInt32 unregisters_complete{0};
+  CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS);
+  thread_group threads;
+  thread_group unregistering_threads;
+  // All threads but one should join the barrier, waiting.
+  for (int i = 0; i < NUM_THREADS - 1; ++i) {
+    threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+      Status status = barrier.Wait([&counter]() {
+        ++counter;
+        return Status::OK();
+      });
+      EXPECT_TRUE(status.ok());
+      waits_complete.Add(1);
+    }));
+  }
+  SleepForMs(10); // Give other threads a chance to start before unregistering
+  EXPECT_EQ(0, counter) << "The callback should not have run.";
+
+  // All threads unregister, the others are still waiting in the barrier for the last one
+  for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS; ++i) {
+    unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() {
+      barrier.Unregister();
+      unregisters_complete.Add(1);
+    }));
+  }
+  unregistering_threads.join_all();
+
+  EXPECT_EQ(0, counter) << "The callback should not have run.";
+  EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned.";
+  EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS, unregisters_complete.Load())
+      << "Unregisters should have returned.";
+
+  threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+    Status status = barrier.Wait([&counter]() {
+      ++counter;
+      return Status::OK();
+    });
+    EXPECT_TRUE(status.ok());
+    waits_complete.Add(1);
+  }));
+
+  threads.join_all();
+
+  EXPECT_EQ(1, counter) << "Counter should have been incremented by the last thread.";
+  EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned.";
+}
+
 // Passing an empty/null function to Wait() is not supported.
 TEST(CyclicBarrierTest, NullFunction) {
   CyclicBarrier barrier(1);
diff --git a/be/src/util/cyclic-barrier.cc b/be/src/util/cyclic-barrier.cc
index aed0efda8..af681a774 100644
--- a/be/src/util/cyclic-barrier.cc
+++ b/be/src/util/cyclic-barrier.cc
@@ -34,4 +34,16 @@ void CyclicBarrier::Cancel(const Status& err) {
   }
   barrier_cv_.NotifyAll();
 }
+
+void CyclicBarrier::Unregister() {
+  bool notify = false;
+  {
+    unique_lock<mutex> l(lock_);
+    if (!cancel_status_.ok()) return; // Already cancelled.
+    --num_threads_;
+    DCHECK_GE(num_threads_, 0);
+    if (num_waiting_threads_ == num_threads_) notify = true;
+  }
+  if (notify) barrier_cv_.NotifyOne();
+}
 } // namespace impala
diff --git a/be/src/util/cyclic-barrier.h b/be/src/util/cyclic-barrier.h
index 065c1ae7b..9c845db2c 100644
--- a/be/src/util/cyclic-barrier.h
+++ b/be/src/util/cyclic-barrier.h
@@ -54,13 +54,16 @@ class CyclicBarrier {
       if (num_waiting_threads_ < num_threads_) {
         // Wait for the last thread to wake us up.
         int64_t start_cycle = cycle_num_;
-        while (cancel_status_.ok() && cycle_num_ == start_cycle) {
+        while (cancel_status_.ok() && cycle_num_ == start_cycle
+            && num_waiting_threads_ < num_threads_) {
           barrier_cv_.Wait(l);
         }
-        return cancel_status_;
+        if (!cancel_status_.ok() || cycle_num_ > start_cycle) {
+          return cancel_status_;
+        }
       }
-      // This is the last thread and barrier isn't cancelled. We can proceed by
-      // resetting state for the next cycle.
+      // This is the last thread or a woken up thread by the last unregister and barrier
+      // isn't cancelled. We can proceed by resetting state for the next cycle.
       fn_status = fn();
       if (fn_status.ok()) {
         num_waiting_threads_ = 0;
@@ -79,9 +82,13 @@ class CyclicBarrier {
   // 'err' must be a non-OK status.
   void Cancel(const Status& err);
 
+  // Unregisters one thread from the synchronization, wakes up one waiting thread to
+  // execute 'fn' of Wait() if the unregistering thread was the last one.
+  void Unregister();
+
  private:
   // The number of threads participating in synchronization.
-  const int num_threads_;
+  int num_threads_;
 
   // Protects below members.
   std::mutex lock_;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
index 02c56139e..a3a2bfcc3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
@@ -49,3 +49,13 @@ int,int
 6,6
 7,7
 ====
+---- QUERY
+# IMPALA-12233: make sure PHJ does not hang with limit
+select ss_cdemo_sk from tpcds.store_sales where ss_sold_date_sk = (select max(ss_sold_date_sk) from tpcds.store_sales) group by ss_cdemo_sk limit 3;
+---- RESULTS
+row_regex: [0-9]*
+row_regex: [0-9]*
+row_regex: [0-9]*
+---- TYPES
+int
+====
\ No newline at end of file