You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/02/08 01:46:04 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #34060: GH-34059: [C++] Add a fetch node based on a batch index

westonpace commented on code in PR #34060:
URL: https://github.com/apache/arrow/pull/34060#discussion_r1099540214


##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -53,5 +56,98 @@ class AccumulationQueue {
   std::vector<ExecBatch> batches_;
 };
 
+/// A queue that sequences incoming batches
+///
+/// This can be used when a node needs to do some kind of ordered processing on
+/// the stream.
+///
+/// Batches can be inserted in any order.  The process_callback will be called on
+/// the batches, in order, without reentrant calls. For this reason the callback
+/// should be quick.
+///
+/// For example, in a top-n node, the process callback should determine how many
+/// rows need to be delivered for the given batch, and then return a task to actually
+/// deliver those rows.
+class SequencingQueue {
+ public:
+  using Task = std::function<Status()>;
+
+  /// Strategy that describes how to handle items
+  class Processor {
+   public:
+    /// Process the batch, potentially generating a task
+    ///
+    /// This method will be called on each batch in order.  Calls to this method
+    /// will be serialized and it will not be called reentrantly.  This makes it
+    /// safe to do things that rely on order but minimal time should be spent here
+    /// to avoid becoming a bottlneck.
+    ///
+    /// \return a follow-up task that will be scheduled.  The follow-up task(s) are
+    ///         is not guaranteed to run in any particular order.  If nullopt is
+    ///         returned then nothing will be scheduled.
+    virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
+    /// Schedule a task
+    virtual void Schedule(Task task) = 0;
+  };
+
+  virtual ~SequencingQueue() = default;
+
+  /// Insert a batch into the queue
+  ///
+  /// This will insert the batch into the queue.  If this batch was the next batch
+  /// to deliver then this will trigger 1+ calls to the process callback to generate
+  /// 1+ tasks.
+  ///
+  /// The task generated by this call will be executed immediately.  The remaining
+  /// tasks will be scheduled using the schedule callback.

Review Comment:
   Yes.  Basically, don't create a new thread task for the batch of data that just arrived, since it's already in the processor's cache.  So if the batches arrive:
   
   1 2 4 3 5
   
   then only a single thread task is scheduled (for 3).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org