You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/13 23:10:41 UTC

[arrow] branch master updated: ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 07135b254d ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler
07135b254d is described below

commit 07135b254d17198ce6d4cb459c5d6c6955389bc3
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri May 13 13:10:20 2022 -1000

    ARROW-16498: [C++] Fix potential deadlock in arrow::compute::TaskScheduler
    
    Closes #13091 from westonpace/bugfix/ARROW-16498--task-scheduler-deadlock
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/CMakeLists.txt    |   7 +-
 cpp/src/arrow/compute/exec/task_util.cc      |  17 ++
 cpp/src/arrow/compute/exec/task_util.h       |   2 +-
 cpp/src/arrow/compute/exec/task_util_test.cc | 228 +++++++++++++++++++++++++++
 cpp/src/arrow/compute/exec/util.h            |   2 +-
 5 files changed, 253 insertions(+), 3 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt
index b2a21c2bd6..0a5f1f30ff 100644
--- a/cpp/src/arrow/compute/exec/CMakeLists.txt
+++ b/cpp/src/arrow/compute/exec/CMakeLists.txt
@@ -34,7 +34,12 @@ add_arrow_compute_test(hash_join_node_test
                        key_hash_test.cc)
 add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute")
 add_arrow_compute_test(union_node_test PREFIX "arrow-compute")
-add_arrow_compute_test(util_test PREFIX "arrow-compute")
+add_arrow_compute_test(util_test
+                       PREFIX
+                       "arrow-compute"
+                       SOURCES
+                       util_test.cc
+                       task_util_test.cc)
 
 add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
 
diff --git a/cpp/src/arrow/compute/exec/task_util.cc b/cpp/src/arrow/compute/exec/task_util.cc
index e5e714d34a..8568450119 100644
--- a/cpp/src/arrow/compute/exec/task_util.cc
+++ b/cpp/src/arrow/compute/exec/task_util.cc
@@ -96,6 +96,11 @@ class TaskSchedulerImpl : public TaskScheduler {
                       // fields), aborted_ flag and register_finished_ flag
 
   AtomicWithPadding<int> num_tasks_to_schedule_;
+  // If a task group adds tasks it's possible for a thread inside
+  // ScheduleMore to miss this fact.  This serves as a flag to
+  // notify the scheduling thread that it might need to make
+  // another pass through the scheduler
+  AtomicWithPadding<bool> tasks_added_recently_;
 };
 
 TaskSchedulerImpl::TaskSchedulerImpl()
@@ -104,6 +109,7 @@ TaskSchedulerImpl::TaskSchedulerImpl()
       aborted_(false),
       register_finished_(false) {
   num_tasks_to_schedule_.value.store(0);
+  tasks_added_recently_.value.store(false);
 }
 
 int TaskSchedulerImpl::RegisterTaskGroup(TaskImpl task_impl,
@@ -150,6 +156,7 @@ Status TaskSchedulerImpl::StartTaskGroup(size_t thread_id, int group_id,
   }
 
   if (!aborted) {
+    tasks_added_recently_.value.store(true);
     return ScheduleMore(thread_id);
   } else {
     return Status::Cancelled("Scheduler cancelled");
@@ -343,6 +350,16 @@ Status TaskSchedulerImpl::ScheduleMore(size_t thread_id, int num_tasks_finished)
     num_tasks_to_schedule_.value += num_new_tasks - static_cast<int>(tasks.size());
   }
 
+  bool expected_might_have_missed_tasks = true;
+  if (tasks_added_recently_.value.compare_exchange_strong(
+          expected_might_have_missed_tasks, false)) {
+    if (tasks.empty()) {
+      // num_tasks_finished has already been added to num_tasks_to_schedule so
+      // pass 0 here.
+      return ScheduleMore(thread_id);
+    }
+  }
+
   for (size_t i = 0; i < tasks.size(); ++i) {
     int group_id = tasks[i].first;
     int64_t task_id = tasks[i].second;
diff --git a/cpp/src/arrow/compute/exec/task_util.h b/cpp/src/arrow/compute/exec/task_util.h
index 44540d255d..5dc66aee16 100644
--- a/cpp/src/arrow/compute/exec/task_util.h
+++ b/cpp/src/arrow/compute/exec/task_util.h
@@ -53,7 +53,7 @@ class AtomicWithPadding {
 //
 // Also allows for executing next pending tasks immediately using a caller thread.
 //
-class TaskScheduler {
+class ARROW_EXPORT TaskScheduler {
  public:
   using TaskImpl = std::function<Status(size_t, int64_t)>;
   using TaskGroupContinuationImpl = std::function<Status(size_t)>;
diff --git a/cpp/src/arrow/compute/exec/task_util_test.cc b/cpp/src/arrow/compute/exec/task_util_test.cc
new file mode 100644
index 0000000000..fd9502bdfa
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/task_util_test.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/task_util.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::ThreadPool;
+
+namespace compute {
+
+/// \brief Create a thread pool and start all threads
+///
+/// By default a thread pool will not create threads until they
+/// are actually needed.  This can make it a bit difficult to
+/// reproduce certain issues.  This creates a thread pool and
+/// then makes sure the threads are actually created before
+/// returning it.
+Result<std::shared_ptr<ThreadPool>> MakePrimedThreadPool(int num_threads) {
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> thread_pool,
+                        ThreadPool::Make(num_threads));
+  int num_threads_running = 0;
+  std::mutex mutex;
+  std::condition_variable thread_gate;
+  std::condition_variable primer_gate;
+  for (int i = 0; i < num_threads; i++) {
+    // This shouldn't fail and, if it fails midway, we will have some threads
+    // still running if we do RETURN_NOT_OK so lets do ABORT_NOT_OK
+    ABORT_NOT_OK(thread_pool->Spawn([&] {
+      std::unique_lock<std::mutex> lk(mutex);
+      num_threads_running++;
+      primer_gate.notify_one();
+      thread_gate.wait(lk);
+    }));
+  }
+  std::unique_lock<std::mutex> primer_lock(mutex);
+  primer_gate.wait(primer_lock, [&] { return num_threads_running == num_threads; });
+  thread_gate.notify_all();
+  primer_lock.unlock();
+  thread_pool->WaitForIdle();
+  return thread_pool;
+}
+
+Status SlowTaskImpl(std::size_t, int64_t) {
+  SleepABit();
+  return Status::OK();
+}
+Status FastTaskImpl(std::size_t, int64_t) { return Status::OK(); }
+// If this is the last task group then start the next stage
+TaskScheduler::TaskGroupContinuationImpl MakeContinuation(
+    std::atomic<int>* counter, std::function<void(std::size_t, int)> start_next_stage,
+    int next_stage) {
+  return [counter, start_next_stage, next_stage](std::size_t thread_id) {
+    if (counter->fetch_sub(1) == 1) {
+      start_next_stage(thread_id, next_stage);
+    }
+    return Status::OK();
+  };
+}
+// Signal the cv if this is the last group
+TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation(
+    std::atomic<int>* counter, std::mutex* mutex, std::condition_variable* finish) {
+  return [=](std::size_t thread_id) {
+    if (counter->fetch_sub(1) == 1) {
+      std::lock_guard<std::mutex> lg(*mutex);
+      finish->notify_one();
+    }
+    return Status::OK();
+  };
+}
+
+// This test simulates one of the current use patterns of the
+// task scheduler.  There are a number of groups.  The groups
+// are allocated to stages.  All groups in a stage execute
+// concurrently.  When all groups in that stage finish the next
+// stage is started.
+TEST(TaskScheduler, Stress) {
+  constexpr int kNumThreads = 8;
+  constexpr int kNumGroups = 8;
+  constexpr int kGroupsPerStage = 3;
+  constexpr int kTasksPerGroup = 32;
+  constexpr int kNumStages = (kNumGroups % kGroupsPerStage == 0)
+                                 ? (kNumGroups / kGroupsPerStage)
+                                 : (kNumGroups / kGroupsPerStage) + 1;
+  constexpr int kTrailingGroups = (kNumGroups % kGroupsPerStage == 0)
+                                      ? kGroupsPerStage
+                                      : kNumGroups % kGroupsPerStage;
+
+  ThreadIndexer thread_indexer;
+  int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
+                       ThreadPool::Make(num_threads));
+
+  std::array<std::atomic<int>, kNumStages - 1> stage_counters;
+  for (int i = 0; i < kNumStages - 1; i++) {
+    stage_counters[i].store(kGroupsPerStage);
+  }
+  std::atomic<int> final_counter(kTrailingGroups);
+  std::mutex mutex;
+  std::condition_variable finish_cv;
+
+  std::vector<int> group_ids;
+  auto scheduler = TaskScheduler::Make();
+
+  std::function<void(std::size_t, int)> start_next_stage = [&](std::size_t thread_id,
+                                                               int stage_index) {
+    int start = stage_index * kGroupsPerStage;
+    int end = std::min(kNumGroups, start + kGroupsPerStage);
+    for (int i = start; i < end; i++) {
+      ASSERT_OK(thread_pool->Spawn([&, i] {
+        std::size_t my_thread_id = thread_indexer();
+        SleepABit();
+        ASSERT_OK(scheduler->StartTaskGroup(my_thread_id, group_ids[i], kTasksPerGroup));
+      }));
+    }
+  };
+
+  for (auto i = 0; i < kNumGroups; i++) {
+    int next_stage = (i / kGroupsPerStage) + 1;
+    TaskScheduler::TaskGroupContinuationImpl finish =
+        MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
+    if (next_stage < kNumStages) {
+      finish =
+          MakeContinuation(&stage_counters[next_stage - 1], start_next_stage, next_stage);
+    }
+    group_ids.push_back(scheduler->RegisterTaskGroup(SlowTaskImpl, finish));
+  }
+  scheduler->RegisterEnd();
+
+  TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
+  TaskScheduler::ScheduleImpl schedule =
+      [&](TaskScheduler::TaskGroupContinuationImpl task) {
+        return thread_pool->Spawn([&, task] {
+          std::size_t thread_id = thread_indexer();
+          ASSERT_OK(task(thread_id));
+        });
+      };
+  std::unique_lock<std::mutex> lock(mutex);
+  ASSERT_OK(thread_pool->Spawn([&] {
+    std::size_t thread_id = thread_indexer();
+    ASSERT_OK(scheduler->StartScheduling(thread_id, schedule, num_threads * 4, false));
+    start_next_stage(thread_id, 0);
+  }));
+
+  finish_cv.wait(lock);
+  thread_pool->WaitForIdle();
+}
+
+// This is a reproducer for a bug that was encountered when one
+// thread starts a task group while another thread is finishing
+// the last of its tasks.
+TEST(TaskScheduler, StressTwo) {
+  constexpr int kNumThreads = 16;
+  constexpr int kNumGroups = 8;
+  constexpr int kTasksPerGroup = 1;
+  constexpr int kIterations = 1000;
+
+  ThreadIndexer thread_indexer;
+  int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
+                       MakePrimedThreadPool(num_threads));
+
+  for (int i = 0; i < kIterations; i++) {
+    std::atomic<int> final_counter(kNumGroups);
+    std::mutex mutex;
+    std::condition_variable finish_cv;
+
+    std::vector<int> group_ids;
+    auto scheduler = TaskScheduler::Make();
+
+    for (auto i = 0; i < kNumGroups; i++) {
+      TaskScheduler::TaskGroupContinuationImpl finish =
+          MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
+      group_ids.push_back(scheduler->RegisterTaskGroup(FastTaskImpl, finish));
+    }
+    scheduler->RegisterEnd();
+
+    TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
+    TaskScheduler::ScheduleImpl schedule =
+        [&](TaskScheduler::TaskGroupContinuationImpl task) {
+          return thread_pool->Spawn([&, task] {
+            std::size_t thread_id = thread_indexer();
+            ASSERT_OK(task(thread_id));
+          });
+        };
+
+    ASSERT_OK(scheduler->StartScheduling(0, schedule, num_threads * 4, false));
+    std::unique_lock<std::mutex> lock(mutex);
+    for (int i = 0; i < kNumGroups; i++) {
+      ASSERT_OK(thread_pool->Spawn([&, i] {
+        std::size_t thread_id = thread_indexer();
+        ASSERT_OK(scheduler->StartTaskGroup(thread_id, i, kTasksPerGroup));
+      }));
+    }
+
+    finish_cv.wait(lock);
+    thread_pool->WaitForIdle();
+  }
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 3ec7016e6a..4e7550582a 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -280,7 +280,7 @@ class ARROW_EXPORT AtomicCounter {
   std::atomic<bool> complete_{false};
 };
 
-class ThreadIndexer {
+class ARROW_EXPORT ThreadIndexer {
  public:
   size_t operator()();