You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/09 15:59:51 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #14524: ARROW-17509: [C++] Simplify async scheduler by removing the need to call End

lidavidm commented on code in PR #14524:
URL: https://github.com/apache/arrow/pull/14524#discussion_r1018083155


##########
cpp/src/arrow/util/async_util.h:
##########
@@ -48,50 +50,25 @@ namespace util {
 /// By default the scheduler will submit the task (execute the synchronous part) as
 /// soon as it is added, assuming the underlying thread pool hasn't terminated or the
 /// scheduler hasn't aborted.  In this mode, the scheduler is simply acting as
-/// a task group (keeping track of the ongoing work).
+/// a simple task group.
 ///
-/// This can be used to provide structured concurrency for asynchronous development.
-/// A task group created at a high level can be distributed amongst low level components
-/// which register work to be completed.  The high level job can then wait for all work
-/// to be completed before cleaning up.
-///
-/// A task scheduler must eventually be ended when all tasks have been added.  Once the
-/// scheduler has been ended it is an error to add further tasks.  Note, it is not an
-/// error to add additional tasks after a scheduler has aborted (though these tasks
-/// will be ignored and never submitted).  The scheduler has a future which will complete
-/// once the scheduler has been ended AND all remaining tasks have finished executing.
-/// Ending a scheduler will NOT cause the scheduler to flush existing tasks.
+/// A task scheduler starts with an initial task.  That task, and all subsequent tasks
+/// are free to add subtasks.  Once all submitted tasks finsih the scheduler will

Review Comment:
   ```suggestion
   /// are free to add subtasks.  Once all submitted tasks finish the scheduler will
   ```



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -100,345 +119,323 @@ class FifoQueue : public AsyncTaskScheduler::Queue {
 
 class AlreadyFailedScheduler : public AsyncTaskScheduler {

Review Comment:
   Is this actually used anymore?



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -145,206 +188,216 @@ class ARROW_EXPORT AsyncTaskScheduler {
     /// acquired and the caller can proceed.  If a future is returned then the caller
     /// should wait for the future to complete first.  When the returned future completes
     /// the permits have NOT been acquired and the caller must call Acquire again
+    ///
+    /// \param amt the number of permits to acquire
     virtual std::optional<Future<>> TryAcquire(int amt) = 0;
     /// Release amt permits
     ///
     /// This will possibly complete waiting futures and should probably not be
     /// called while holding locks.
+    ///
+    /// \param amt the number of permits to release
     virtual void Release(int amt) = 0;
 
     /// The size of the largest task that can run
     ///
     /// Incoming tasks will have their cost latched to this value to ensure
-    /// they can still run (although they will generally be the only thing allowed to
+    /// they can still run (although they will be the only thing allowed to
     /// run at that time).
     virtual int Capacity() = 0;
+
+    /// Pause the throttle
+    ///
+    /// Any tasks that have been submitted already will continue.  However, no new tasks
+    /// will be run until the throttle is resumed.
+    virtual void Pause() = 0;
+    /// Resume the throttle
+    ///
+    /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+    /// it will still apply.
+    virtual void Resume() = 0;
   };
-  /// Create a throttle
+
+  /// Pause the throttle
+  ///
+  /// Any tasks that have been submitted already will continue.  However, no new tasks
+  /// will be run until the throttle is resumed.
+  virtual void Pause() = 0;
+  /// Resume the throttle
   ///
-  /// This throttle is used to limit how many tasks can run at once.  The
-  /// user should keep the throttle alive for the lifetime of the scheduler.
-  /// The same throttle can be used in multiple schedulers.
-  static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+  /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+  /// it will still apply.
+  virtual void Resume() = 0;
 
-  /// Add a task to the scheduler
+  /// Create a throttled view of a scheduler
   ///
-  /// If the scheduler is in an aborted state this call will return false and the task
-  /// will never be run.  This is harmless and does not need to be guarded against.
+  /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot
+  /// run immediately, will be placed into a queue.
   ///
-  /// If the scheduler is in an ended state then this call will cause an program abort.
-  /// This represents a logic error in the program and should be avoidable.
+  /// Using a throttled view after the underlying scheduler has finished is invalid.
   ///
-  /// If there are no limits on the number of concurrent tasks then the submit function
-  /// will be run immediately.
+  /// Although a shared_ptr is returned it should generally be assumed that the caller
+  /// is being given exclusive ownership.  The shared_ptr is used to share the view with
+  /// queued and submitted tasks and the lifetime of those is unpredictable.  It is
+  /// important the caller keep the returned pointer alive for as long as they plan to add
+  /// tasks to the view.
   ///
-  /// Otherwise, if there is a throttle, and it is full, then this task will be inserted
-  /// into the scheduler's queue and submitted when there is space.
+  /// \param scheduler a scheduler to submit tasks to after throttling
   ///
-  /// The return value for this call can usually be ignored.  There is little harm in
-  /// attempting to add tasks to an aborted scheduler.  It is only included for callers
-  /// that want to avoid future task generation.
+  /// This can be the root scheduler, another throttled scheduler, or a task group.  These
+  /// are all composable.
   ///
-  /// \return true if the task was submitted or queued, false if the task was ignored
-  virtual bool AddTask(std::unique_ptr<Task> task) = 0;
+  /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time
+  ///
+  /// If a task is added that has a cost greater than max_concurrent_cost then its cost
+  /// will be reduced to max_concurrent_cost so that it is still possible for the task to
+  /// run.
+  ///
+  /// \param queue the queue to use when tasks cannot be submitted
+  ///
+  /// By default a FIFO queue will be used.  However, a custom queue can be provided if
+  /// some tasks have higher priority than other tasks.
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> Make(
+      AsyncTaskScheduler* scheduler, int max_concurrent_cost,
+      std::unique_ptr<Queue> queue = NULLPTR);
 
-  /// Adds an async generator to the scheduler
+  /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle
   ///
-  /// The async generator will be visited, one item at a time.  Submitting a task
-  /// will consist of polling the generator for the next future.  The generator's future
-  /// will then represent the task itself.
+  /// \see Make
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> MakeWithCustomThrottle(
+      AsyncTaskScheduler* scheduler, std::unique_ptr<Throttle> throttle,
+      std::unique_ptr<Queue> queue = NULLPTR);
+};
+
+/// A utility to keep track of a collection of tasks
+///
+/// Often it is useful to keep track of some state that only needs to stay alive
+/// for some small collection of tasks, or to perform some kind of final cleanup
+/// when a collection of tasks is finished.
+///
+/// For example, when scanning, we need to keep the file reader alive while all scan
+/// tasks run for a given file, and then we can gracefully close it when we finish the
+/// file.
+class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler {
+ public:
+  /// Destructor for the task group
   ///
-  /// This visits the task serially without readahead.  If readahead or parallelism
-  /// is desired then it should be added in the generator itself.
+  /// The finish callback will not run until the task group is destroyed and all
+  /// tasks are finished so you will generally want to eagerly call this at some point

Review Comment:
   "eagerly call this" meaning "explicitly destroy the task group"?



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -145,206 +188,216 @@ class ARROW_EXPORT AsyncTaskScheduler {
     /// acquired and the caller can proceed.  If a future is returned then the caller
     /// should wait for the future to complete first.  When the returned future completes
     /// the permits have NOT been acquired and the caller must call Acquire again
+    ///
+    /// \param amt the number of permits to acquire
     virtual std::optional<Future<>> TryAcquire(int amt) = 0;
     /// Release amt permits
     ///
     /// This will possibly complete waiting futures and should probably not be
     /// called while holding locks.
+    ///
+    /// \param amt the number of permits to release
     virtual void Release(int amt) = 0;
 
     /// The size of the largest task that can run
     ///
     /// Incoming tasks will have their cost latched to this value to ensure
-    /// they can still run (although they will generally be the only thing allowed to
+    /// they can still run (although they will be the only thing allowed to
     /// run at that time).
     virtual int Capacity() = 0;
+
+    /// Pause the throttle
+    ///
+    /// Any tasks that have been submitted already will continue.  However, no new tasks
+    /// will be run until the throttle is resumed.
+    virtual void Pause() = 0;
+    /// Resume the throttle
+    ///
+    /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+    /// it will still apply.
+    virtual void Resume() = 0;
   };
-  /// Create a throttle
+
+  /// Pause the throttle
+  ///
+  /// Any tasks that have been submitted already will continue.  However, no new tasks
+  /// will be run until the throttle is resumed.
+  virtual void Pause() = 0;
+  /// Resume the throttle
   ///
-  /// This throttle is used to limit how many tasks can run at once.  The
-  /// user should keep the throttle alive for the lifetime of the scheduler.
-  /// The same throttle can be used in multiple schedulers.
-  static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+  /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+  /// it will still apply.
+  virtual void Resume() = 0;
 
-  /// Add a task to the scheduler
+  /// Create a throttled view of a scheduler
   ///
-  /// If the scheduler is in an aborted state this call will return false and the task
-  /// will never be run.  This is harmless and does not need to be guarded against.
+  /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot
+  /// run immediately, will be placed into a queue.
   ///
-  /// If the scheduler is in an ended state then this call will cause an program abort.
-  /// This represents a logic error in the program and should be avoidable.
+  /// Using a throttled view after the underlying scheduler has finished is invalid.

Review Comment:
   nit, but it may be a little confusing/error-prone to have different API contracts for a subclass of the main API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org