You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/09 13:51:30 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

pitrou commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648313166



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -106,12 +107,18 @@ template <typename T>
 class SimpleExecutor {
  public:
   explicit SimpleExecutor(int nfutures)
-      : pool_(ThreadPool::Make(/*threads=*/4).ValueOrDie()) {
+      : pool_(SimpleThreadPool::Make(/*threads=*/4).ValueOrDie()) {
     for (int i = 0; i < nfutures; ++i) {
       futures_.push_back(Future<T>::Make());
     }
   }
 
+  // Need to explicilty call the pool shutdown.  The worker threads have a shared_ptr
+  // to the pool so they will keep it alive after this class is destructed.  However, the

Review comment:
       Hmm, does that mean there is a reference cycle which would make the pool immortal? Or just that the pool would be destroyed too late? (hopefully the latter)

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);

Review comment:
       `wait` is prone to spurious wakeups, so it should always be called with a predicate (or the predicate checked separately in a loop).

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();

Review comment:
       Hmm... how do you know the notified one won't be the `wait` call just below?
   
   Also, I'm not sure that `notify_one` *guarantees* that only one thread will be notified. For example, the [POSIX equivalent](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_signal.html) says:
   > The `pthread_cond_signal()` function shall unblock *at least one* of the threads that are blocked on the specified condition variable
   
   (emphasis mine)
   

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);
+      completed = true;
+    }));
+  }
+
+  std::unique_lock<std::mutex> lock(mx);
+  cv.wait(lock, [&ready] { return ready; });
+  // Thread pool has shut down, now we unblock the task
+  cv.notify_one();
+  lock.unlock();
+
+  // The worker thread should be able to keep the thread pool alive by itself

Review comment:
       Hmm... really? Why is that? If the user doesn't hold any reference to a thread pool anymore, then presumably the thread pool should "disappear". Whether it does before or after the currently running tasks finish, I don't know.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -321,37 +335,139 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
-  struct State;
+  // ------------- Statistics API ---------------
+
+  /// The current number of tasks either currently running or in the queue to run
+  uint64_t NumTasksRunningOrQueued() const;
+  /// A guess at the maximum number of tasks running or queued at any one point
+  uint64_t MaxTasksQueued() const;
+  /// The total number of tasks that have been submitted over the lifetime of the pool
+  uint64_t TotalTasksQueued() const;
+
+  // ------------- Children API -----------------
+
+  /// Called by children when a worker thread completes a task
+  void RecordFinishedTask();
+  /// True if the thread pool is shutting down, should only be checked if a thread has
+  /// no tasks to work on.  This allows us to ensure we drain the task queue before
+  /// shutting down the pool.
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuitNow) again
+  bool ShouldWorkerQuit(ThreadIt* thread_it);
+  /// True if the thread is no longer needed (e.g. excess capacity) or if a quick shutdown
+  /// has been requested.  Should be checked frequently as threads can quit with remaining
+  /// work if this is true
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuit) again
+  bool ShouldWorkerQuitNow(ThreadIt* thread_it);
+  /// Should be called first by a worker thread as soon as it starts up.  Until this call
+  /// finishes `thread_it` will not have a valid value
+  void WaitForReady();
+  /// Called by a child implementation when new work arrives that should wake up idle
+  /// threads.  This will notify one worker waiting on WaitForWork.  Generally called
+  /// in DoSubmitTask but might be called less often if a child implementation wants
+  /// to
+  void NotifyIdleWorker();
+  /// Called by a worker thread that is ready to wait for work to arrive
+  void WaitForWork();
+
+  struct Control;
 
  protected:
+  FRIEND_TEST(TestThreadPool, DestroyWithoutShutdown);
   FRIEND_TEST(TestThreadPool, SetCapacity);
   FRIEND_TEST(TestGlobalThreadPool, Capacity);
   friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
-  ThreadPool();
+  explicit ThreadPool(bool eternal = false);
 
   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                    StopCallback&&) override;
 
+  /// Called on the child process after a fork.  After a fork all threads will have ceased
+  /// running in the child process.  This method should clean up the thread pool state and
+  /// restart any previously running threads.
+  ///
+  /// The behavior is somewhat ill-defined if tasks are running when the fork happened.
+  /// For more details see ARROW-12879
+  virtual void ResetAfterFork();
+
+  /// Launches a worker thread
+  virtual std::shared_ptr<Thread> LaunchWorker(ThreadIt thread_it) = 0;
+  /// Adds a task to the task queue(s)
+  virtual void DoSubmitTask(TaskHints hints, Task task) = 0;
+  /// Should return true only if there is no work to be done
+  virtual bool Empty() = 0;
   // Collect finished worker threads, making sure the OS threads have exited
   void CollectFinishedWorkersUnlocked();
   // Launch a given number of additional workers
   void LaunchWorkersUnlocked(int threads);
+  // Marks a thread finished and removes it from the workers list
+  void MarkThreadFinishedUnlocked(ThreadIt* thread_it);
   // Get the current actual capacity
-  int GetActualCapacity();
+  int GetActualCapacity() const;
+  // Get the amount of threads we could still launch based on capacity and # of tasks
+  int GetAdditionalThreadsNeeded() const;
   // Reinitialize the thread pool if the pid changed
   void ProtectAgainstFork();
+  void RecordTaskSubmitted();
 
   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
 
-  std::shared_ptr<State> sp_state_;
-  State* state_;
+  std::atomic<uint64_t> num_tasks_running_;

Review comment:
       Rather than exposing this in a `.h`, I'd rather see those instance variables and methods segregated to a separate object (call it "implementation", "task queue" or whatever you prefer :-)). It would also avoid the `Thread` wrapper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org