You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/09 21:01:32 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #14524: ARROW-17509: [C++] Simplify async scheduler by removing the need to call End

westonpace commented on code in PR #14524:
URL: https://github.com/apache/arrow/pull/14524#discussion_r1018410231


##########
cpp/src/arrow/util/async_util.h:
##########
@@ -145,206 +188,216 @@ class ARROW_EXPORT AsyncTaskScheduler {
     /// acquired and the caller can proceed.  If a future is returned then the caller
     /// should wait for the future to complete first.  When the returned future completes
     /// the permits have NOT been acquired and the caller must call Acquire again
+    ///
+    /// \param amt the number of permits to acquire
     virtual std::optional<Future<>> TryAcquire(int amt) = 0;
     /// Release amt permits
     ///
     /// This will possibly complete waiting futures and should probably not be
     /// called while holding locks.
+    ///
+    /// \param amt the number of permits to release
     virtual void Release(int amt) = 0;
 
     /// The size of the largest task that can run
     ///
     /// Incoming tasks will have their cost latched to this value to ensure
-    /// they can still run (although they will generally be the only thing allowed to
+    /// they can still run (although they will be the only thing allowed to
     /// run at that time).
     virtual int Capacity() = 0;
+
+    /// Pause the throttle
+    ///
+    /// Any tasks that have been submitted already will continue.  However, no new tasks
+    /// will be run until the throttle is resumed.
+    virtual void Pause() = 0;
+    /// Resume the throttle
+    ///
+    /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+    /// it will still apply.
+    virtual void Resume() = 0;
   };
-  /// Create a throttle
+
+  /// Pause the throttle
+  ///
+  /// Any tasks that have been submitted already will continue.  However, no new tasks
+  /// will be run until the throttle is resumed.
+  virtual void Pause() = 0;
+  /// Resume the throttle
   ///
-  /// This throttle is used to limit how many tasks can run at once.  The
-  /// user should keep the throttle alive for the lifetime of the scheduler.
-  /// The same throttle can be used in multiple schedulers.
-  static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+  /// Allows taks to be submitted again.  If there is a max_concurrent_cost limit then
+  /// it will still apply.
+  virtual void Resume() = 0;
 
-  /// Add a task to the scheduler
+  /// Create a throttled view of a scheduler
   ///
-  /// If the scheduler is in an aborted state this call will return false and the task
-  /// will never be run.  This is harmless and does not need to be guarded against.
+  /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot
+  /// run immediately, will be placed into a queue.
   ///
-  /// If the scheduler is in an ended state then this call will cause an program abort.
-  /// This represents a logic error in the program and should be avoidable.
+  /// Using a throttled view after the underlying scheduler has finished is invalid.
   ///
-  /// If there are no limits on the number of concurrent tasks then the submit function
-  /// will be run immediately.
+  /// Although a shared_ptr is returned it should generally be assumed that the caller
+  /// is being given exclusive ownership.  The shared_ptr is used to share the view with
+  /// queued and submitted tasks and the lifetime of those is unpredictable.  It is
+  /// important the caller keep the returned pointer alive for as long as they plan to add
+  /// tasks to the view.
   ///
-  /// Otherwise, if there is a throttle, and it is full, then this task will be inserted
-  /// into the scheduler's queue and submitted when there is space.
+  /// \param scheduler a scheduler to submit tasks to after throttling
   ///
-  /// The return value for this call can usually be ignored.  There is little harm in
-  /// attempting to add tasks to an aborted scheduler.  It is only included for callers
-  /// that want to avoid future task generation.
+  /// This can be the root scheduler, another throttled scheduler, or a task group.  These
+  /// are all composable.
   ///
-  /// \return true if the task was submitted or queued, false if the task was ignored
-  virtual bool AddTask(std::unique_ptr<Task> task) = 0;
+  /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time
+  ///
+  /// If a task is added that has a cost greater than max_concurrent_cost then its cost
+  /// will be reduced to max_concurrent_cost so that it is still possible for the task to
+  /// run.
+  ///
+  /// \param queue the queue to use when tasks cannot be submitted
+  ///
+  /// By default a FIFO queue will be used.  However, a custom queue can be provided if
+  /// some tasks have higher priority than other tasks.
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> Make(
+      AsyncTaskScheduler* scheduler, int max_concurrent_cost,
+      std::unique_ptr<Queue> queue = NULLPTR);
 
-  /// Adds an async generator to the scheduler
+  /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle
   ///
-  /// The async generator will be visited, one item at a time.  Submitting a task
-  /// will consist of polling the generator for the next future.  The generator's future
-  /// will then represent the task itself.
+  /// \see Make
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> MakeWithCustomThrottle(
+      AsyncTaskScheduler* scheduler, std::unique_ptr<Throttle> throttle,
+      std::unique_ptr<Queue> queue = NULLPTR);
+};
+
+/// A utility to keep track of a collection of tasks
+///
+/// Often it is useful to keep track of some state that only needs to stay alive
+/// for some small collection of tasks, or to perform some kind of final cleanup
+/// when a collection of tasks is finished.
+///
+/// For example, when scanning, we need to keep the file reader alive while all scan
+/// tasks run for a given file, and then we can gracefully close it when we finish the
+/// file.
+class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler {
+ public:
+  /// Destructor for the task group
   ///
-  /// This visits the task serially without readahead.  If readahead or parallelism
-  /// is desired then it should be added in the generator itself.
+  /// The finish callback will not run until the task group is destroyed and all
+  /// tasks are finished so you will generally want to eagerly call this at some point

Review Comment:
   Yes, good point.  This comment made more sense on an older iteration of the code.  I moved this paragraph to the `Make` call as that is the point where the unique_ptr is returned and made it clearer that the user should eagerly reset/destroy the unique_ptr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org