You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/13 23:10:41 UTC
[arrow] branch master updated: ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 07135b254d ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler
07135b254d is described below
commit 07135b254d17198ce6d4cb459c5d6c6955389bc3
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri May 13 13:10:20 2022 -1000
ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler
Closes #13091 from westonpace/bugfix/ARROW-16498--task-scheduler-deadlock
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/CMakeLists.txt | 7 +-
cpp/src/arrow/compute/exec/task_util.cc | 17 ++
cpp/src/arrow/compute/exec/task_util.h | 2 +-
cpp/src/arrow/compute/exec/task_util_test.cc | 228 +++++++++++++++++++++++++++
cpp/src/arrow/compute/exec/util.h | 2 +-
5 files changed, 253 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt
index b2a21c2bd6..0a5f1f30ff 100644
--- a/cpp/src/arrow/compute/exec/CMakeLists.txt
+++ b/cpp/src/arrow/compute/exec/CMakeLists.txt
@@ -34,7 +34,12 @@ add_arrow_compute_test(hash_join_node_test
key_hash_test.cc)
add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute")
add_arrow_compute_test(union_node_test PREFIX "arrow-compute")
-add_arrow_compute_test(util_test PREFIX "arrow-compute")
+add_arrow_compute_test(util_test
+ PREFIX
+ "arrow-compute"
+ SOURCES
+ util_test.cc
+ task_util_test.cc)
add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/exec/task_util.cc b/cpp/src/arrow/compute/exec/task_util.cc
index e5e714d34a..8568450119 100644
--- a/cpp/src/arrow/compute/exec/task_util.cc
+++ b/cpp/src/arrow/compute/exec/task_util.cc
@@ -96,6 +96,11 @@ class TaskSchedulerImpl : public TaskScheduler {
// fields), aborted_ flag and register_finished_ flag
AtomicWithPadding<int> num_tasks_to_schedule_;
+ // If a task group adds tasks it's possible for a thread inside
+ // ScheduleMore to miss this fact. This serves as a flag to
+ // notify the scheduling thread that it might need to make
+ // another pass through the scheduler
+ AtomicWithPadding<bool> tasks_added_recently_;
};
TaskSchedulerImpl::TaskSchedulerImpl()
@@ -104,6 +109,7 @@ TaskSchedulerImpl::TaskSchedulerImpl()
aborted_(false),
register_finished_(false) {
num_tasks_to_schedule_.value.store(0);
+ tasks_added_recently_.value.store(false);
}
int TaskSchedulerImpl::RegisterTaskGroup(TaskImpl task_impl,
@@ -150,6 +156,7 @@ Status TaskSchedulerImpl::StartTaskGroup(size_t thread_id, int group_id,
}
if (!aborted) {
+ tasks_added_recently_.value.store(true);
return ScheduleMore(thread_id);
} else {
return Status::Cancelled("Scheduler cancelled");
@@ -343,6 +350,16 @@ Status TaskSchedulerImpl::ScheduleMore(size_t thread_id, int num_tasks_finished)
num_tasks_to_schedule_.value += num_new_tasks - static_cast<int>(tasks.size());
}
+ bool expected_might_have_missed_tasks = true;
+ if (tasks_added_recently_.value.compare_exchange_strong(
+ expected_might_have_missed_tasks, false)) {
+ if (tasks.empty()) {
+ // num_tasks_finished has already been added to num_tasks_to_schedule so
+ // pass 0 here.
+ return ScheduleMore(thread_id);
+ }
+ }
+
for (size_t i = 0; i < tasks.size(); ++i) {
int group_id = tasks[i].first;
int64_t task_id = tasks[i].second;
diff --git a/cpp/src/arrow/compute/exec/task_util.h b/cpp/src/arrow/compute/exec/task_util.h
index 44540d255d..5dc66aee16 100644
--- a/cpp/src/arrow/compute/exec/task_util.h
+++ b/cpp/src/arrow/compute/exec/task_util.h
@@ -53,7 +53,7 @@ class AtomicWithPadding {
//
// Also allows for executing next pending tasks immediately using a caller thread.
//
-class TaskScheduler {
+class ARROW_EXPORT TaskScheduler {
public:
using TaskImpl = std::function<Status(size_t, int64_t)>;
using TaskGroupContinuationImpl = std::function<Status(size_t)>;
diff --git a/cpp/src/arrow/compute/exec/task_util_test.cc b/cpp/src/arrow/compute/exec/task_util_test.cc
new file mode 100644
index 0000000000..fd9502bdfa
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/task_util_test.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/task_util.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::ThreadPool;
+
+namespace compute {
+
+/// \brief Create a thread pool and start all threads
+///
+/// By default a thread pool will not create threads until they
+/// are actually needed. This can make it a bit difficult to
+/// reproduce certain issues. This creates a thread pool and
+/// then makes sure the threads are actually created before
+/// returning it.
+Result<std::shared_ptr<ThreadPool>> MakePrimedThreadPool(int num_threads) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> thread_pool,
+ ThreadPool::Make(num_threads));
+ int num_threads_running = 0;
+ std::mutex mutex;
+ std::condition_variable thread_gate;
+ std::condition_variable primer_gate;
+ for (int i = 0; i < num_threads; i++) {
+ // This shouldn't fail and, if it fails midway, we will have some threads
+ // still running if we do RETURN_NOT_OK so lets do ABORT_NOT_OK
+ ABORT_NOT_OK(thread_pool->Spawn([&] {
+ std::unique_lock<std::mutex> lk(mutex);
+ num_threads_running++;
+ primer_gate.notify_one();
+ thread_gate.wait(lk);
+ }));
+ }
+ std::unique_lock<std::mutex> primer_lock(mutex);
+ primer_gate.wait(primer_lock, [&] { return num_threads_running == num_threads; });
+ thread_gate.notify_all();
+ primer_lock.unlock();
+ thread_pool->WaitForIdle();
+ return thread_pool;
+}
+
+Status SlowTaskImpl(std::size_t, int64_t) {
+ SleepABit();
+ return Status::OK();
+}
+Status FastTaskImpl(std::size_t, int64_t) { return Status::OK(); }
+// If this is the last task group then start the next stage
+TaskScheduler::TaskGroupContinuationImpl MakeContinuation(
+ std::atomic<int>* counter, std::function<void(std::size_t, int)> start_next_stage,
+ int next_stage) {
+ return [counter, start_next_stage, next_stage](std::size_t thread_id) {
+ if (counter->fetch_sub(1) == 1) {
+ start_next_stage(thread_id, next_stage);
+ }
+ return Status::OK();
+ };
+}
+// Signal the cv if this is the last group
+TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation(
+ std::atomic<int>* counter, std::mutex* mutex, std::condition_variable* finish) {
+ return [=](std::size_t thread_id) {
+ if (counter->fetch_sub(1) == 1) {
+ std::lock_guard<std::mutex> lg(*mutex);
+ finish->notify_one();
+ }
+ return Status::OK();
+ };
+}
+
+// This test simulates one of the current use patterns of the
+// task scheduler. There are a number of groups. The groups
+// are allocated to stages. All groups in a stage execute
+// concurrently. When all groups in that stage finish the next
+// stage is started.
+TEST(TaskScheduler, Stress) {
+ constexpr int kNumThreads = 8;
+ constexpr int kNumGroups = 8;
+ constexpr int kGroupsPerStage = 3;
+ constexpr int kTasksPerGroup = 32;
+ constexpr int kNumStages = (kNumGroups % kGroupsPerStage == 0)
+ ? (kNumGroups / kGroupsPerStage)
+ : (kNumGroups / kGroupsPerStage) + 1;
+ constexpr int kTrailingGroups = (kNumGroups % kGroupsPerStage == 0)
+ ? kGroupsPerStage
+ : kNumGroups % kGroupsPerStage;
+
+ ThreadIndexer thread_indexer;
+ int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
+ ThreadPool::Make(num_threads));
+
+ std::array<std::atomic<int>, kNumStages - 1> stage_counters;
+ for (int i = 0; i < kNumStages - 1; i++) {
+ stage_counters[i].store(kGroupsPerStage);
+ }
+ std::atomic<int> final_counter(kTrailingGroups);
+ std::mutex mutex;
+ std::condition_variable finish_cv;
+
+ std::vector<int> group_ids;
+ auto scheduler = TaskScheduler::Make();
+
+ std::function<void(std::size_t, int)> start_next_stage = [&](std::size_t thread_id,
+ int stage_index) {
+ int start = stage_index * kGroupsPerStage;
+ int end = std::min(kNumGroups, start + kGroupsPerStage);
+ for (int i = start; i < end; i++) {
+ ASSERT_OK(thread_pool->Spawn([&, i] {
+ std::size_t my_thread_id = thread_indexer();
+ SleepABit();
+ ASSERT_OK(scheduler->StartTaskGroup(my_thread_id, group_ids[i], kTasksPerGroup));
+ }));
+ }
+ };
+
+ for (auto i = 0; i < kNumGroups; i++) {
+ int next_stage = (i / kGroupsPerStage) + 1;
+ TaskScheduler::TaskGroupContinuationImpl finish =
+ MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
+ if (next_stage < kNumStages) {
+ finish =
+ MakeContinuation(&stage_counters[next_stage - 1], start_next_stage, next_stage);
+ }
+ group_ids.push_back(scheduler->RegisterTaskGroup(SlowTaskImpl, finish));
+ }
+ scheduler->RegisterEnd();
+
+ TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
+ TaskScheduler::ScheduleImpl schedule =
+ [&](TaskScheduler::TaskGroupContinuationImpl task) {
+ return thread_pool->Spawn([&, task] {
+ std::size_t thread_id = thread_indexer();
+ ASSERT_OK(task(thread_id));
+ });
+ };
+ std::unique_lock<std::mutex> lock(mutex);
+ ASSERT_OK(thread_pool->Spawn([&] {
+ std::size_t thread_id = thread_indexer();
+ ASSERT_OK(scheduler->StartScheduling(thread_id, schedule, num_threads * 4, false));
+ start_next_stage(thread_id, 0);
+ }));
+
+ finish_cv.wait(lock);
+ thread_pool->WaitForIdle();
+}
+
+// This is a reproducer for a bug that was encountered when one
+// thread starts a task group while another thread is finishing
+// the last of its tasks.
+TEST(TaskScheduler, StressTwo) {
+ constexpr int kNumThreads = 16;
+ constexpr int kNumGroups = 8;
+ constexpr int kTasksPerGroup = 1;
+ constexpr int kIterations = 1000;
+
+ ThreadIndexer thread_indexer;
+ int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
+ MakePrimedThreadPool(num_threads));
+
+ for (int i = 0; i < kIterations; i++) {
+ std::atomic<int> final_counter(kNumGroups);
+ std::mutex mutex;
+ std::condition_variable finish_cv;
+
+ std::vector<int> group_ids;
+ auto scheduler = TaskScheduler::Make();
+
+ for (auto i = 0; i < kNumGroups; i++) {
+ TaskScheduler::TaskGroupContinuationImpl finish =
+ MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
+ group_ids.push_back(scheduler->RegisterTaskGroup(FastTaskImpl, finish));
+ }
+ scheduler->RegisterEnd();
+
+ TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
+ TaskScheduler::ScheduleImpl schedule =
+ [&](TaskScheduler::TaskGroupContinuationImpl task) {
+ return thread_pool->Spawn([&, task] {
+ std::size_t thread_id = thread_indexer();
+ ASSERT_OK(task(thread_id));
+ });
+ };
+
+ ASSERT_OK(scheduler->StartScheduling(0, schedule, num_threads * 4, false));
+ std::unique_lock<std::mutex> lock(mutex);
+ for (int i = 0; i < kNumGroups; i++) {
+ ASSERT_OK(thread_pool->Spawn([&, i] {
+ std::size_t thread_id = thread_indexer();
+ ASSERT_OK(scheduler->StartTaskGroup(thread_id, i, kTasksPerGroup));
+ }));
+ }
+
+ finish_cv.wait(lock);
+ thread_pool->WaitForIdle();
+ }
+}
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 3ec7016e6a..4e7550582a 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -280,7 +280,7 @@ class ARROW_EXPORT AtomicCounter {
std::atomic<bool> complete_{false};
};
-class ThreadIndexer {
+class ARROW_EXPORT ThreadIndexer {
public:
size_t operator()();