You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/06 14:58:43 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #9892: ARROW-12208: [C++] Add the ability to run async tasks without using the CPU thread pool

pitrou commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r607892026



##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
 
 }  // namespace
 
+struct SerialExecutor::State {
+  std::queue<Task> task_queue;
+  std::mutex mutex;
+  std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+                                 StopToken stop_token, StopCallback&& stop_callback) {
+  // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
+  // from external threads (e.g. when transferring back from blocking I/O threads) so a
+  // mutex is needed
+  {
+    std::lock_guard<std::mutex> lg(state_->mutex);
+    state_->task_queue.push(
+        Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
+  }
+  state_->wait_for_tasks.notify_one();
+  return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {

Review comment:
       `bool* finished`, since it's mutable.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+  template <typename T = ::arrow::detail::Empty>
+  using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+  template <typename T = ::arrow::detail::Empty>
+  using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;

Review comment:
       "Scheduler" is confusing. "TopLevelTask" perhaps?

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -29,10 +29,12 @@
 #include "arrow/dataset/partition.h"
 #include "arrow/dataset/scanner.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/thread_pool.h"

Review comment:
       Is this required?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>
+struct TerminalCallback {
+  void operator()() {
+    auto result = std::move(callback)();
+    std::move(finish_signal)(result);
+  }
+
+  FnOnce<Result<T>()> callback;
+  SerialExecutor::FinishSignal<T> finish_signal;
+};
+
+template <>
+struct TerminalCallback<arrow::detail::Empty> {
+  void operator()() {
+    auto st = std::move(callback)();
+    if (!st.ok()) {
+      std::move(finish_signal)(st);
+    } else {
+      std::move(finish_signal)(arrow::detail::Empty());
+    }
+  }
+
+  FnOnce<Status()> callback;
+  SerialExecutor::FinishSignal<> finish_signal;
+};
+
+TEST(TestSerialExecutor, Create) {
+  bool task_ran = false;
+  SerialExecutor::Scheduler<> task = [&](Executor* executor,
+                                         SerialExecutor::FinishSignal<> finish_signal) {
+    EXPECT_TRUE(executor != nullptr);
+    task_ran = true;
+    std::move(finish_signal)(arrow::detail::Empty());
+    return Status::OK();
+  };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(task)));
+  EXPECT_TRUE(task_ran);
+}
+
+TEST(TestSerialExecutor, SpawnNested) {
+  bool nested_ran = false;
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        return executor->Spawn(TerminalCallback<>{[&] {
+                                                    nested_ran = true;
+                                                    return Status::OK();
+                                                  },
+                                                  std::move(finish_signal)});
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_TRUE(nested_ran);
+}
+
+TEST(TestSerialExecutor, WithResult) {
+  SerialExecutor::Scheduler<int> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<int> finish_signal) {
+        return executor->Spawn(
+            TerminalCallback<int>{[] { return 42; }, std::move(finish_signal)});
+      };
+  ASSERT_OK_AND_EQ(42, SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+}
+
+TEST(TestSerialExecutor, StopToken) {
+  bool nested_ran = false;
+  StopSource stop_source;
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token()));
+        RETURN_NOT_OK(executor->Spawn(
+            TerminalCallback<>{[&] { return Status::OK(); }, std::move(finish_signal)}));
+        stop_source.RequestStop(Status::Invalid("XYZ"));
+        return Status::OK();
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_FALSE(nested_ran);
+}
+
+TEST(TestSerialExecutor, ContinueAfterExternal) {
+  bool continuation_ran = false;
+  EXPECT_OK_AND_ASSIGN(auto mockIoPool, ThreadPool::Make(1));
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        struct Callback {
+          void operator()(const Result<arrow::detail::Empty>& emp) {
+            continuation_ran = true;
+            std::move(finish_signal)(emp);
+          }
+          SerialExecutor::FinishSignal<> finish_signal;
+          bool& continuation_ran;
+        };
+        executor
+            ->Transfer(DeferNotOk(mockIoPool->Submit([&] {
+              SleepABit();
+              return Status::OK();
+            })))
+            .AddCallback(Callback{std::move(finish_signal), continuation_ran});
+        return Status::OK();
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_TRUE(continuation_ran);
+}
+
+TEST(TestSerialExecutor, SchedulerAbort) {
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        return Status::Invalid("XYZ");
+      };
+  ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+}
+
+TEST(TestSerialExecutor, PropagatedError) {
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        std::move(finish_signal)(Status::Invalid("XYZ"));
+        return Status::OK();
+      };
+  ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler)));

Review comment:
       Hmm... instead of testing `RunInSerialExecutor` explicitly, can all these tests call `RunSynchronously`, so that you can run the same tests for different `use_threads`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+  struct InnerCallback {
+    void operator()(const Result<T> res) { std::move(finish_signal)(std::move(res)); }
+    SerialExecutor::FinishSignal<T> finish_signal;
+  };
+  struct OuterCallback {
+    Status operator()(Executor* executor, SerialExecutor::FinishSignal<T> finish_signal) {
+      auto fut = std::move(get_future)(executor);
+      fut.AddCallback(InnerCallback{std::move(finish_signal)});
+      return Status::OK();
+    }
+    FnOnce<Future<T>(Executor*)> get_future;
+  };
+  return SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true

Review comment:
       This isn't very descriptive. Please something such as:
   ```c++
   /// \brief Spawn a future and any dependent tasks on an executor, wait for completion
   ///
   /// If `use_threads` is true, the global CPU executor is used.
   /// If `use_threads` is false, a temporary SerialExecutor is used.
   /// `get_future` is called (from this thread) with the chosen executor and must
   /// return a future that will eventually finish. This function returns once the
   /// future has finished.
   ```
   

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially

Review comment:
       "Run"

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are

Review comment:
       This is true of a parallel executor as well, no? 

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask {
         source_(fragment->source()) {}
 
   Result<RecordBatchIterator> Execute() override {
-    ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
+    ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));

Review comment:
       Shouldn't you lookup `ScanOptions::use_threads`? Or am I missing something?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+  struct InnerCallback {
+    void operator()(const Result<T> res) { std::move(finish_signal)(std::move(res)); }
+    SerialExecutor::FinishSignal<T> finish_signal;
+  };
+  struct OuterCallback {
+    Status operator()(Executor* executor, SerialExecutor::FinishSignal<T> finish_signal) {
+      auto fut = std::move(get_future)(executor);
+      fut.AddCallback(InnerCallback{std::move(finish_signal)});
+      return Status::OK();
+    }
+    FnOnce<Future<T>(Executor*)> get_future;
+  };
+  return SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true
+/// \see RunSerially
+///
+/// If `use_threads` is false then the operation is run normally but this method will
+/// still block the calling thread until the operation has completed.
+template <typename T>
+Result<T> RunSynchronously(FnOnce<Future<T>(Executor*)> get_future, bool use_threads) {

Review comment:
       Why not return a `Future<T>` instead? It seems that would at least remove the `FinishSignal` complication.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>

Review comment:
       Add comments for this?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -32,6 +32,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/type_fwd.h"
 #include "arrow/util/async_generator.h"
+#include "arrow/util/thread_pool.h"

Review comment:
       No need to, `internal::Executor` is already declared in `arrow/util/type_fwd.h`.
   Also, can you try to remove `async_generator.h`?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>
+struct TerminalCallback {
+  void operator()() {
+    auto result = std::move(callback)();
+    std::move(finish_signal)(result);
+  }
+
+  FnOnce<Result<T>()> callback;
+  SerialExecutor::FinishSignal<T> finish_signal;
+};
+
+template <>
+struct TerminalCallback<arrow::detail::Empty> {
+  void operator()() {
+    auto st = std::move(callback)();
+    if (!st.ok()) {
+      std::move(finish_signal)(st);
+    } else {
+      std::move(finish_signal)(arrow::detail::Empty());
+    }
+  }
+
+  FnOnce<Status()> callback;
+  SerialExecutor::FinishSignal<> finish_signal;
+};
+
+TEST(TestSerialExecutor, Create) {
+  bool task_ran = false;
+  SerialExecutor::Scheduler<> task = [&](Executor* executor,
+                                         SerialExecutor::FinishSignal<> finish_signal) {
+    EXPECT_TRUE(executor != nullptr);

Review comment:
       `ASSERT_NE`

##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
 
 }  // namespace
 
+struct SerialExecutor::State {
+  std::queue<Task> task_queue;
+  std::mutex mutex;
+  std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+                                 StopToken stop_token, StopCallback&& stop_callback) {
+  // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
+  // from external threads (e.g. when transferring back from blocking I/O threads) so a
+  // mutex is needed
+  {
+    std::lock_guard<std::mutex> lg(state_->mutex);
+    state_->task_queue.push(
+        Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
+  }
+  state_->wait_for_tasks.notify_one();
+  return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {
+  {
+    std::lock_guard<std::mutex> lk(state_->mutex);
+    finished = true;
+  }
+  state_->wait_for_tasks.notify_one();
+}
+
+void SerialExecutor::RunLoop(const bool& finished) {
+  std::unique_lock<std::mutex> lk(state_->mutex);
+
+  while (!finished) {
+    while (!state_->task_queue.empty()) {
+      Task& task = state_->task_queue.front();

Review comment:
       The `task_queue` can be mutated from another thread, and I don't think there are any guarantees about reference stability.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+  template <typename T = ::arrow::detail::Empty>
+  using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+  template <typename T = ::arrow::detail::Empty>
+  using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;
+
+  SerialExecutor();
+  ~SerialExecutor();
+
+  int GetCapacity() override { return 1; };
+  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+                   StopCallback&&) override;
+
+  /// \brief Runs the scheduler and any scheduled tasks
+  ///
+  /// The scheduler must either return an invalid status or call the finish signal.
+  /// Failure to do this will result in a deadlock.  For this reason it is preferable (if
+  /// possible) to use the helper methods (below) RunSynchronously/RunSerially which
+  /// delegates the responsiblity onto a Future producer's existing responsibility to
+  /// always mark a future finished (which can someday be aided by ARROW-12207).
+  template <typename T>
+  static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) {
+    auto serial_executor = std::make_shared<SerialExecutor>();

Review comment:
       Nit, but this can probably be `SerialExecutor serial_executor;`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org