You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/26 01:08:06 UTC

[GitHub] [arrow] westonpace opened a new pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

westonpace opened a new pull request #10401:
URL: https://github.com/apache/arrow/pull/10401


   This PR splits `arrow::internal::ThreadPool` into a base class (`ThreadPool`) and a concrete implementation (`SimpleThreadPool`).  The base class handles capacity, statistics, and lifecycle.  The concrete implementations handle task queuing and worker loops.  This is a step towards a WorkStealingThreadPool class coming soon.  I'm leaving this in draft until the work stealing version is available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-857902006


   @pitrou Unfortunately, it passes when run locally in that way.  My development system is also Ubuntu 20.04 so I've tried various stress runs but haven't had any luck.  I'm hopeful this last change will put the stack traces in the build log as it does seem to be failing reliably on the build server.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-868113047


   @pitrou Don't worry about the delay.  I think I've figured out a better approach now using an internal header.  Now the public header is much cleaner and I got rid of the Thread class.  Failing check (s3fs) is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-865106129


   Hmm, I don't think you have to expose any inheritance at the public API level. You can just do so in the private impl classes (you can call them however you like :-)). That would help clean up the public header too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace closed pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace closed pull request #10401:
URL: https://github.com/apache/arrow/pull/10401


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648833804



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);
+      completed = true;
+    }));
+  }
+
+  std::unique_lock<std::mutex> lock(mx);
+  cv.wait(lock, [&ready] { return ready; });
+  // Thread pool has shut down, now we unblock the task
+  cv.notify_one();
+  lock.unlock();
+
+  // The worker thread should be able to keep the thread pool alive by itself

Review comment:
       This logic ended up being wrong.  The pointer should indeed (and now does) "disappear".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648541270



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();

Review comment:
       I'm pretty sure you're right, notify may wake up more than one thread.  However, I don't think it can wake up a thread before it starts waiting.  In other words, a thread can never notify itself.
   
   I can probably rewrite this with two conditions though to make it more legible and guard against spurious wakeups.  My goal here is to ensure that the worker thread is spawned before I lose the reference to the thread pool.  Although, thinking on this again, the synchronization may not be needed at all since the lambda that creates a copy of the thread pool's shared_ptr is guaranteed to have been created before `Spawn` returns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-881178890


   This needs a rebase and shouldn't be merged in for 5.0 so I'm going to close this now and reopen after 5.0 is solid.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648544911



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();

Review comment:
       Yes, you should probably use two different predicates and booleans.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648313166



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -106,12 +107,18 @@ template <typename T>
 class SimpleExecutor {
  public:
   explicit SimpleExecutor(int nfutures)
-      : pool_(ThreadPool::Make(/*threads=*/4).ValueOrDie()) {
+      : pool_(SimpleThreadPool::Make(/*threads=*/4).ValueOrDie()) {
     for (int i = 0; i < nfutures; ++i) {
       futures_.push_back(Future<T>::Make());
     }
   }
 
+  // Need to explicilty call the pool shutdown.  The worker threads have a shared_ptr
+  // to the pool so they will keep it alive after this class is destructed.  However, the

Review comment:
       Hmm, does that mean there is a reference cycle which would make the pool immortal? Or just that the pool would be destroyed too late? (hopefully the latter)

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);

Review comment:
       `wait` is prone to spurious wakeups, so it should always be called with a predicate (or the predicate checked separately in a loop).

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();

Review comment:
       Hmm... how do you know the notified one won't be the `wait` call just below?
   
   Also, I'm not sure that `notify_one` *guarantees* that only one thread will be notified. For example, the [POSIX equivalent](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_signal.html) says:
   > The `pthread_cond_signal()` function shall unblock *at least one* of the threads that are blocked on the specified condition variable
   
   (emphasis mine)
   

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);
+      completed = true;
+    }));
+  }
+
+  std::unique_lock<std::mutex> lock(mx);
+  cv.wait(lock, [&ready] { return ready; });
+  // Thread pool has shut down, now we unblock the task
+  cv.notify_one();
+  lock.unlock();
+
+  // The worker thread should be able to keep the thread pool alive by itself

Review comment:
       Hmm... really? Why is that? If the user doesn't hold any reference to a thread pool anymore, then presumably the thread pool should "disappear". Whether it does before or after the currently running tasks finish, I don't know.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -321,37 +335,139 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
-  struct State;
+  // ------------- Statistics API ---------------
+
+  /// The current number of tasks either currently running or in the queue to run
+  uint64_t NumTasksRunningOrQueued() const;
+  /// A guess at the maximum number of tasks running or queued at any one point
+  uint64_t MaxTasksQueued() const;
+  /// The total number of tasks that have been submitted over the lifetime of the pool
+  uint64_t TotalTasksQueued() const;
+
+  // ------------- Children API -----------------
+
+  /// Called by children when a worker thread completes a task
+  void RecordFinishedTask();
+  /// True if the thread pool is shutting down, should only be checked if a thread has
+  /// no tasks to work on.  This allows us to ensure we drain the task queue before
+  /// shutting down the pool.
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuitNow) again
+  bool ShouldWorkerQuit(ThreadIt* thread_it);
+  /// True if the thread is no longer needed (e.g. excess capacity) or if a quick shutdown
+  /// has been requested.  Should be checked frequently as threads can quit with remaining
+  /// work if this is true
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuit) again
+  bool ShouldWorkerQuitNow(ThreadIt* thread_it);
+  /// Should be called first by a worker thread as soon as it starts up.  Until this call
+  /// finishes `thread_it` will not have a valid value
+  void WaitForReady();
+  /// Called by a child implementation when new work arrives that should wake up idle
+  /// threads.  This will notify one worker waiting on WaitForWork.  Generally called
+  /// in DoSubmitTask but might be called less often if a child implementation wants
+  /// to
+  void NotifyIdleWorker();
+  /// Called by a worker thread that is ready to wait for work to arrive
+  void WaitForWork();
+
+  struct Control;
 
  protected:
+  FRIEND_TEST(TestThreadPool, DestroyWithoutShutdown);
   FRIEND_TEST(TestThreadPool, SetCapacity);
   FRIEND_TEST(TestGlobalThreadPool, Capacity);
   friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
-  ThreadPool();
+  explicit ThreadPool(bool eternal = false);
 
   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                    StopCallback&&) override;
 
+  /// Called on the child process after a fork.  After a fork all threads will have ceased
+  /// running in the child process.  This method should clean up the thread pool state and
+  /// restart any previously running threads.
+  ///
+  /// The behavior is somewhat ill-defined if tasks are running when the fork happened.
+  /// For more details see ARROW-12879
+  virtual void ResetAfterFork();
+
+  /// Launches a worker thread
+  virtual std::shared_ptr<Thread> LaunchWorker(ThreadIt thread_it) = 0;
+  /// Adds a task to the task queue(s)
+  virtual void DoSubmitTask(TaskHints hints, Task task) = 0;
+  /// Should return true only if there is no work to be done
+  virtual bool Empty() = 0;
   // Collect finished worker threads, making sure the OS threads have exited
   void CollectFinishedWorkersUnlocked();
   // Launch a given number of additional workers
   void LaunchWorkersUnlocked(int threads);
+  // Marks a thread finished and removes it from the workers list
+  void MarkThreadFinishedUnlocked(ThreadIt* thread_it);
   // Get the current actual capacity
-  int GetActualCapacity();
+  int GetActualCapacity() const;
+  // Get the amount of threads we could still launch based on capacity and # of tasks
+  int GetAdditionalThreadsNeeded() const;
   // Reinitialize the thread pool if the pid changed
   void ProtectAgainstFork();
+  void RecordTaskSubmitted();
 
   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
 
-  std::shared_ptr<State> sp_state_;
-  State* state_;
+  std::atomic<uint64_t> num_tasks_running_;

Review comment:
       Rather than exposing this in a `.h`, I'd rather see those instance variables and methods segregated to a separate object (call it "implementation", "task queue" or whatever you prefer :-)). It would also avoid the `Thread` wrapper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-857898290


   @westonpace I would recommend running the ASAN build locally using `archery docker ...`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-865106129


   Hmm, I don't think you have to expose any inheritance at the public API level. You can just do so in the private impl classes (you can call them however you like :-)). That would help clean up the public header too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-850090913


   @pitrou See #10420 for an example alternative thread pool (some of the changes from 10420 will need to be back-ported to this PR).  Basically the goal is to allow the capacity management / fork protection / shutdown logic to go into the base class while the task queue and worker loops belong to the child implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648833516



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -321,37 +335,139 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
-  struct State;
+  // ------------- Statistics API ---------------
+
+  /// The current number of tasks either currently running or in the queue to run
+  uint64_t NumTasksRunningOrQueued() const;
+  /// A guess at the maximum number of tasks running or queued at any one point
+  uint64_t MaxTasksQueued() const;
+  /// The total number of tasks that have been submitted over the lifetime of the pool
+  uint64_t TotalTasksQueued() const;
+
+  // ------------- Children API -----------------
+
+  /// Called by children when a worker thread completes a task
+  void RecordFinishedTask();
+  /// True if the thread pool is shutting down, should only be checked if a thread has
+  /// no tasks to work on.  This allows us to ensure we drain the task queue before
+  /// shutting down the pool.
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuitNow) again
+  bool ShouldWorkerQuit(ThreadIt* thread_it);
+  /// True if the thread is no longer needed (e.g. excess capacity) or if a quick shutdown
+  /// has been requested.  Should be checked frequently as threads can quit with remaining
+  /// work if this is true
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuit) again
+  bool ShouldWorkerQuitNow(ThreadIt* thread_it);
+  /// Should be called first by a worker thread as soon as it starts up.  Until this call
+  /// finishes `thread_it` will not have a valid value
+  void WaitForReady();
+  /// Called by a child implementation when new work arrives that should wake up idle
+  /// threads.  This will notify one worker waiting on WaitForWork.  Generally called
+  /// in DoSubmitTask but might be called less often if a child implementation wants
+  /// to
+  void NotifyIdleWorker();
+  /// Called by a worker thread that is ready to wait for work to arrive
+  void WaitForWork();
+
+  struct Control;
 
  protected:
+  FRIEND_TEST(TestThreadPool, DestroyWithoutShutdown);
   FRIEND_TEST(TestThreadPool, SetCapacity);
   FRIEND_TEST(TestGlobalThreadPool, Capacity);
   friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
-  ThreadPool();
+  explicit ThreadPool(bool eternal = false);
 
   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                    StopCallback&&) override;
 
+  /// Called on the child process after a fork.  After a fork all threads will have ceased
+  /// running in the child process.  This method should clean up the thread pool state and
+  /// restart any previously running threads.
+  ///
+  /// The behavior is somewhat ill-defined if tasks are running when the fork happened.
+  /// For more details see ARROW-12879
+  virtual void ResetAfterFork();
+
+  /// Launches a worker thread
+  virtual std::shared_ptr<Thread> LaunchWorker(ThreadIt thread_it) = 0;
+  /// Adds a task to the task queue(s)
+  virtual void DoSubmitTask(TaskHints hints, Task task) = 0;
+  /// Should return true only if there is no work to be done
+  virtual bool Empty() = 0;
   // Collect finished worker threads, making sure the OS threads have exited
   void CollectFinishedWorkersUnlocked();
   // Launch a given number of additional workers
   void LaunchWorkersUnlocked(int threads);
+  // Marks a thread finished and removes it from the workers list
+  void MarkThreadFinishedUnlocked(ThreadIt* thread_it);
   // Get the current actual capacity
-  int GetActualCapacity();
+  int GetActualCapacity() const;
+  // Get the amount of threads we could still launch based on capacity and # of tasks
+  int GetAdditionalThreadsNeeded() const;
   // Reinitialize the thread pool if the pid changed
   void ProtectAgainstFork();
+  void RecordTaskSubmitted();
 
   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
 
-  std::shared_ptr<State> sp_state_;
-  State* state_;
+  std::atomic<uint64_t> num_tasks_running_;

Review comment:
       I've made a stab at this (at the moment the implementation is called `WorkerControl` but I could be convinced to call it `ThreadPoolImpl` but it doesn't avoid the thread wrapper because of inheritance.  If `ThreadPool` is a base class that other implementations can extend then it has a pure virtual `LaunchWorker` that has to return "something that can be joined" which is what the `Thread` wrapper is for.  Options:
   
    * Keep the thread wrapper
    * Just include `<thread>` in `thread_pool.h`
    * Make the pimpl class extensible instead of the public class.  This will force (as best I can tell) all future child implementations to be in `thread_pool.cc`.  It would simplify the logic in other places though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-865106129


   Hmm, I don't think you have to expose any inheritance at the public API level. You can just do so in the private impl classes (you can call them however you like :-)). That would help clean up the public header too.
   
   (also, er, big sorry for the delay :-S)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-865106129


   Hmm, I don't think you have to expose any inheritance at the public API level. You can just do so in the private impl classes (you can call them however you like :-)). That would help clean up the public header too.
   
   (also, er, big sorry for the delay :-S)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-848618336


   I'm not sure this is a good idea. I don't expect thread pool implementations to share a lot of code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-848382149


   https://issues.apache.org/jira/browse/ARROW-12878


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648833585



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();

Review comment:
       I've cleaned this test up to use two predicates & booleans.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648543567



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -452,6 +454,42 @@ TEST_F(TestThreadPool, QuickShutdown) {
   add_tester.CheckNotAllComputed();
 }
 
+TEST_F(TestThreadPool, DestroyWithoutShutdown) {
+  std::mutex mx;
+  std::condition_variable cv;
+  std::weak_ptr<ThreadPool> weak_pool;
+  bool ready = false;
+  bool completed = false;
+  {
+    auto pool = this->MakeThreadPool(1);
+    weak_pool = pool;
+    // Simulate Windows
+    pool->shutdown_on_destroy_ = false;
+
+    ASSERT_OK(pool->Spawn([&mx, &cv, &completed, &ready] {
+      std::unique_lock<std::mutex> lock(mx);
+      ready = true;
+      cv.notify_one();
+      cv.wait(lock);
+      completed = true;
+    }));
+  }
+
+  std::unique_lock<std::mutex> lock(mx);
+  cv.wait(lock, [&ready] { return ready; });
+  // Thread pool has shut down, now we unblock the task
+  cv.notify_one();
+  lock.unlock();
+
+  // The worker thread should be able to keep the thread pool alive by itself

Review comment:
       It's more of an implementation detail (since the worker thread calls methods on the thread pool).  You are right that it doesn't really exist as a "black box" requirement.  The thread pool user shouldn't care.
   
   I can take out the assert.  If the worker thread were doing the wrong thing it would segfault and/or throw an exception which should cause the test to fail so there may not be anything I need to assert.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-859991386


   This is stable again and ready for review.  As mentioned in https://github.com/apache/arrow/pull/10401#discussion_r648833516 there is a little bit of awkwardness with pimpl + inheritance so let me know if one of those alternatives seems preferable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648832182



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -106,12 +107,18 @@ template <typename T>
 class SimpleExecutor {
  public:
   explicit SimpleExecutor(int nfutures)
-      : pool_(ThreadPool::Make(/*threads=*/4).ValueOrDie()) {
+      : pool_(SimpleThreadPool::Make(/*threads=*/4).ValueOrDie()) {
     for (int i = 0; i < nfutures; ++i) {
       futures_.push_back(Future<T>::Make());
     }
   }
 
+  // Need to explicilty call the pool shutdown.  The worker threads have a shared_ptr
+  // to the pool so they will keep it alive after this class is destructed.  However, the

Review comment:
       Actually, it was the former, and that was the ASAN error.  If the worker threads have a strong reference to the thread pool then the thread pool is never destroyed unless it is shutdown (which doesn't work for the eternal pools).  I've fixed it now as worker threads only have a reference to the thread pool's state and not the thread pool (as it was originally).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648533784



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -106,12 +107,18 @@ template <typename T>
 class SimpleExecutor {
  public:
   explicit SimpleExecutor(int nfutures)
-      : pool_(ThreadPool::Make(/*threads=*/4).ValueOrDie()) {
+      : pool_(SimpleThreadPool::Make(/*threads=*/4).ValueOrDie()) {
     for (int i = 0; i < nfutures; ++i) {
       futures_.push_back(Future<T>::Make());
     }
   }
 
+  // Need to explicilty call the pool shutdown.  The worker threads have a shared_ptr
+  // to the pool so they will keep it alive after this class is destructed.  However, the

Review comment:
       It is the latter.  The test spawns a task and gives it a reference (via `this` capture) to a future.  The future is destroyed when the test is destroyed which was happening before the spawned task was finished.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10401: ARROW-12878: [C++] Generalize thread pool to allow for different queuing strategies / worker loops

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#issuecomment-857305138


   There is still one more test failure (ASAN / UBSAN) to work through but this should be ready to review.  In addition to work stealing this base could be used for experiments into round robin scheduling or priority scheduling.  Related: ARROW-8767


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org