You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/21 08:22:36 UTC
[impala] 01/03: IMPALA-12233: Fixed PHJ hanging caused by cyclic barrier
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 49d599c7f737388db71f531091d285f03ae1f2f4
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Thu Jul 13 16:56:35 2023 +0200
IMPALA-12233: Fixed PHJ hanging caused by cyclic barrier
Partitioned Hash Join with a limit could hang when using mt_dop>0, due
to the cyclic barrier in PHJBuilder is not cancelled properly. Added
possibility to unregister threads from the synchronization and a call
to it to PHJNode::Close(), so closing threads won't block still
processing ones.
Testing:
- Added new unit tests covering new feature
- Added e2e test to make sure the join does not hang
Change-Id: I8be75c7ce99c015964c8bbb547539e6619ba4f9b
Reviewed-on: http://gerrit.cloudera.org:8080/20179
Reviewed-by: Michael Smith <mi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/partitioned-hash-join-builder.cc | 4 +
be/src/exec/partitioned-hash-join-builder.h | 3 +
be/src/exec/partitioned-hash-join-node.cc | 5 +-
be/src/util/cyclic-barrier-test.cc | 100 +++++++++++++++++++++
be/src/util/cyclic-barrier.cc | 12 +++
be/src/util/cyclic-barrier.h | 17 ++--
.../queries/QueryTest/joins_mt_dop.test | 10 +++
7 files changed, 145 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 1204e3ed2..067214653 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -1383,6 +1383,10 @@ string PhjBuilder::DebugString() const {
ss << " buffer_pool_client=" << buffer_pool_client_->DebugString();
return ss.str();
}
+void PhjBuilder::UnregisterThreadFromBarrier() const {
+ DCHECK(probe_barrier_ != nullptr);
+ probe_barrier_->Unregister();
+}
Status PhjBuilderConfig::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 70fa4491f..dd578e318 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -541,6 +541,9 @@ class PhjBuilder : public JoinBuilder {
std::string DebugString() const;
+ /// Unregisters one probe thread from the barrier
+ void UnregisterThreadFromBarrier() const;
+
/// Computes the minimum reservation required to execute the spilling partitioned
/// hash algorithm successfully for any input size (assuming enough disk space is
/// available for spilled rows). This includes buffers used by the build side,
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index c62dfd3f0..b3e594f1d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -24,8 +24,8 @@
#include "codegen/llvm-codegen.h"
#include "exec/blocking-join-node.inline.h"
-#include "exec/exec-node.inline.h"
#include "exec/exec-node-util.h"
+#include "exec/exec-node.inline.h"
#include "exec/hash-table.inline.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
@@ -35,6 +35,7 @@
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
+#include "util/cyclic-barrier.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
@@ -309,6 +310,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
builder_->CloseFromProbe(state);
waited_for_build_ = false;
}
+
+ if (builder_->num_probe_threads() > 1) builder_->UnregisterThreadFromBarrier();
}
ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll();
diff --git a/be/src/util/cyclic-barrier-test.cc b/be/src/util/cyclic-barrier-test.cc
index cafacd4d0..033a4553f 100644
--- a/be/src/util/cyclic-barrier-test.cc
+++ b/be/src/util/cyclic-barrier-test.cc
@@ -155,6 +155,106 @@ TEST(CyclicBarrierTest, Cancellation) {
IMPALA_ASSERT_DEBUG_DEATH(barrier.Cancel(Status::OK()), "");
}
+// Test that unregister functions as expected when the last call is an Unregister
+TEST(CyclicBarrierTest, UnregisterLast) {
+ const int NUM_THREADS = 8;
+ const int NUM_OF_UNREGISTERING_THREADS = 6;
+ int counter = 0;
+ AtomicInt32 waits_complete{0};
+ AtomicInt32 unregisters_complete{0};
+ CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS);
+ thread_group threads;
+ thread_group unregistering_threads;
+ // All threads should join the barrier, waiting.
+ for (int i = 0; i < NUM_THREADS; ++i) {
+ threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+ Status status = barrier.Wait([&counter]() {
+ ++counter;
+ return Status::OK();
+ });
+ EXPECT_TRUE(status.ok());
+ waits_complete.Add(1);
+ }));
+ }
+ SleepForMs(10); // Give other threads a chance to start before unregistering
+ EXPECT_EQ(0, counter) << "The callback should not have run.";
+
+ // All threads but one unregisters, the others are still waiting in the barrier
+ for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS - 1; ++i) {
+ unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() {
+ barrier.Unregister();
+ unregisters_complete.Add(1);
+ }));
+ }
+ unregistering_threads.join_all();
+
+ EXPECT_EQ(0, counter) << "The callback should not have run.";
+ EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned.";
+ EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS - 1, unregisters_complete.Load())
+ << "Unregisters should have returned.";
+
+ barrier.Unregister();
+
+ threads.join_all();
+
+ EXPECT_EQ(1, counter) << "Counter should have been incremented by the woken up thread "
+ "after the last Unregister.";
+ EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned.";
+}
+
+// Test that unregister functions as expected when the last call is a Wait.
+TEST(CyclicBarrierTest, UnregisterNotLast) {
+ const int NUM_THREADS = 8;
+ const int NUM_OF_UNREGISTERING_THREADS = 6;
+ int counter = 0;
+ AtomicInt32 waits_complete{0};
+ AtomicInt32 unregisters_complete{0};
+ CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS);
+ thread_group threads;
+ thread_group unregistering_threads;
+ // All threads but one should join the barrier, waiting.
+ for (int i = 0; i < NUM_THREADS - 1; ++i) {
+ threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+ Status status = barrier.Wait([&counter]() {
+ ++counter;
+ return Status::OK();
+ });
+ EXPECT_TRUE(status.ok());
+ waits_complete.Add(1);
+ }));
+ }
+ SleepForMs(10); // Give other threads a chance to start before unregistering
+ EXPECT_EQ(0, counter) << "The callback should not have run.";
+
+ // All threads unregister, the others are still waiting in the barrier for the last one
+ for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS; ++i) {
+ unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() {
+ barrier.Unregister();
+ unregisters_complete.Add(1);
+ }));
+ }
+ unregistering_threads.join_all();
+
+ EXPECT_EQ(0, counter) << "The callback should not have run.";
+ EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned.";
+ EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS, unregisters_complete.Load())
+ << "Unregisters should have returned.";
+
+ threads.add_thread(new thread([&barrier, &waits_complete, &counter]() {
+ Status status = barrier.Wait([&counter]() {
+ ++counter;
+ return Status::OK();
+ });
+ EXPECT_TRUE(status.ok());
+ waits_complete.Add(1);
+ }));
+
+ threads.join_all();
+
+ EXPECT_EQ(1, counter) << "Counter should have been incremented by the last thread.";
+ EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned.";
+}
+
// Passing an empty/null function to Wait() is not supported.
TEST(CyclicBarrierTest, NullFunction) {
CyclicBarrier barrier(1);
diff --git a/be/src/util/cyclic-barrier.cc b/be/src/util/cyclic-barrier.cc
index aed0efda8..af681a774 100644
--- a/be/src/util/cyclic-barrier.cc
+++ b/be/src/util/cyclic-barrier.cc
@@ -34,4 +34,16 @@ void CyclicBarrier::Cancel(const Status& err) {
}
barrier_cv_.NotifyAll();
}
+
+void CyclicBarrier::Unregister() {
+ bool notify = false;
+ {
+ unique_lock<mutex> l(lock_);
+ if (!cancel_status_.ok()) return; // Already cancelled.
+ --num_threads_;
+ DCHECK_GE(num_threads_, 0);
+ if (num_waiting_threads_ == num_threads_) notify = true;
+ }
+ if (notify) barrier_cv_.NotifyOne();
+}
} // namespace impala
diff --git a/be/src/util/cyclic-barrier.h b/be/src/util/cyclic-barrier.h
index 065c1ae7b..9c845db2c 100644
--- a/be/src/util/cyclic-barrier.h
+++ b/be/src/util/cyclic-barrier.h
@@ -54,13 +54,16 @@ class CyclicBarrier {
if (num_waiting_threads_ < num_threads_) {
// Wait for the last thread to wake us up.
int64_t start_cycle = cycle_num_;
- while (cancel_status_.ok() && cycle_num_ == start_cycle) {
+ while (cancel_status_.ok() && cycle_num_ == start_cycle
+ && num_waiting_threads_ < num_threads_) {
barrier_cv_.Wait(l);
}
- return cancel_status_;
+ if (!cancel_status_.ok() || cycle_num_ > start_cycle) {
+ return cancel_status_;
+ }
}
- // This is the last thread and barrier isn't cancelled. We can proceed by
- // resetting state for the next cycle.
+ // This is the last thread or a woken up thread by the last unregister and barrier
+ // isn't cancelled. We can proceed by resetting state for the next cycle.
fn_status = fn();
if (fn_status.ok()) {
num_waiting_threads_ = 0;
@@ -79,9 +82,13 @@ class CyclicBarrier {
// 'err' must be a non-OK status.
void Cancel(const Status& err);
+ // Unregisters one thread from the synchronization, wakes up one waiting thread to
+ // execute 'fn' of Wait() if the unregistering thread was the last one.
+ void Unregister();
+
private:
// The number of threads participating in synchronization.
- const int num_threads_;
+ int num_threads_;
// Protects below members.
std::mutex lock_;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
index 02c56139e..a3a2bfcc3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
@@ -49,3 +49,13 @@ int,int
6,6
7,7
====
+---- QUERY
+# IMPALA-12233: make sure PHJ does not hang with limit
+select ss_cdemo_sk from tpcds.store_sales where ss_sold_date_sk = (select max(ss_sold_date_sk) from tpcds.store_sales) group by ss_cdemo_sk limit 3;
+---- RESULTS
+row_regex: [0-9]*
+row_regex: [0-9]*
+row_regex: [0-9]*
+---- TYPES
+int
+====
\ No newline at end of file