You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/02/07 21:40:12 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #34060: GH-34059: [C++] Add a fetch node based on a batch index

lidavidm commented on code in PR #34060:
URL: https://github.com/apache/arrow/pull/34060#discussion_r1099246250


##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -408,6 +408,54 @@ struct ARROW_EXPORT Declaration {
   std::string label;
 };
 
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;

Review Comment:
   nit: `= false` like all the other fields?



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -209,6 +212,11 @@ struct ARROW_EXPORT ExecBatch {
   /// whether any values are Scalar.
   int64_t length = 0;
 
+  /// \brief index of this batch in a sorted stream of batches
+  ///
+  /// This index must be strictly monotoic starting at 0 without gaps

Review Comment:
   nit
   ```suggestion
     /// This index must be strictly monotonic starting at 0 without gaps, or kUnsequencedIndex
   ```



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -53,5 +56,98 @@ class AccumulationQueue {
   std::vector<ExecBatch> batches_;
 };
 
+/// A queue that sequences incoming batches
+///
+/// This can be used when a node needs to do some kind of ordered processing on
+/// the stream.
+///
+/// Batches can be inserted in any order.  The process_callback will be called on
+/// the batches, in order, without reentrant calls. For this reason the callback
+/// should be quick.
+///
+/// For example, in a top-n node, the process callback should determine how many
+/// rows need to be delivered for the given batch, and then return a task to actually
+/// deliver those rows.
+class SequencingQueue {
+ public:
+  using Task = std::function<Status()>;
+
+  /// Strategy that describes how to handle items
+  class Processor {
+   public:
+    /// Process the batch, potentially generating a task
+    ///
+    /// This method will be called on each batch in order.  Calls to this method
+    /// will be serialized and it will not be called reentrantly.  This makes it
+    /// safe to do things that rely on order but minimal time should be spent here
+    /// to avoid becoming a bottlneck.
+    ///
+    /// \return a follow-up task that will be scheduled.  The follow-up task(s) are
+    ///         is not guaranteed to run in any particular order.  If nullopt is
+    ///         returned then nothing will be scheduled.
+    virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
+    /// Schedule a task
+    virtual void Schedule(Task task) = 0;
+  };
+
+  virtual ~SequencingQueue() = default;
+
+  /// Insert a batch into the queue
+  ///
+  /// This will insert the batch into the queue.  If this batch was the next batch
+  /// to deliver then this will trigger 1+ calls to the process callback to generate
+  /// 1+ tasks.
+  ///
+  /// The task generated by this call will be executed immediately.  The remaining
+  /// tasks will be scheduled using the schedule callback.

Review Comment:
   nit: this is a little vague to me (without having read the implementation yet). "The task" here is "The first task" right?



##########
cpp/src/arrow/testing/generator.h:
##########
@@ -234,4 +236,84 @@ class ARROW_TESTING_EXPORT ConstantArrayGenerator {
 ARROW_TESTING_EXPORT
 Result<std::shared_ptr<Array>> ScalarVectorToArray(const ScalarVector& scalars);
 
+namespace gen {
+
+class ARROW_TESTING_EXPORT ArrayGenerator {
+ public:
+  virtual ~ArrayGenerator() = default;
+  virtual Result<std::shared_ptr<Array>> Generate(int64_t num_rows) = 0;
+  virtual std::shared_ptr<DataType> type() const = 0;
+};
+
+class ARROW_TESTING_EXPORT DataGenerator {
+ public:
+  virtual ~DataGenerator() = default;
+  virtual Result<std::shared_ptr<::arrow::RecordBatch>> RecordBatch(int64_t num_rows) = 0;
+  virtual Result<std::vector<std::shared_ptr<::arrow::RecordBatch>>> RecordBatches(
+      int64_t rows_per_batch, int num_batches) = 0;
+#ifdef ARROW_COMPUTE
+  virtual Result<::arrow::compute::ExecBatch> ExecBatch(int64_t num_rows) = 0;
+  virtual Result<std::vector<::arrow::compute::ExecBatch>> ExecBatches(
+      int64_t rows_per_batch, int num_batches) = 0;
+#endif
+  virtual Result<std::shared_ptr<::arrow::Table>> Table(int64_t rows_per_chunk,
+                                                        int num_chunks = 1) = 0;
+  virtual std::shared_ptr<::arrow::Schema> Schema() = 0;
+};
+
+// Same as DataGenerator but instead of returning Result an ok status is EXPECT'd
+class ARROW_TESTING_EXPORT GTestDataGenerator {
+ public:
+  virtual ~GTestDataGenerator() = default;
+  virtual std::shared_ptr<::arrow::RecordBatch> RecordBatch(int64_t num_rows) = 0;
+  virtual std::vector<std::shared_ptr<::arrow::RecordBatch>> RecordBatches(
+      int64_t rows_per_batch, int num_batches) = 0;
+#ifdef ARROW_COMPUTE
+  virtual ::arrow::compute::ExecBatch ExecBatch(int64_t num_rows) = 0;
+  virtual std::vector<::arrow::compute::ExecBatch> ExecBatches(int64_t rows_per_batch,
+                                                               int num_batches) = 0;
+#endif
+  virtual std::shared_ptr<::arrow::Table> Table(int64_t rows_per_chunk,
+                                                int num_chunks = 1) = 0;
+  virtual std::shared_ptr<::arrow::Schema> Schema() = 0;
+};
+
+struct ARROW_TESTING_EXPORT GeneratorField {
+  std::string name;
+  std::shared_ptr<ArrayGenerator> gen;
+};
+
+ARROW_TESTING_EXPORT std::unique_ptr<DataGenerator> Gen(
+    std::vector<std::shared_ptr<ArrayGenerator>> column_gens);
+ARROW_TESTING_EXPORT std::unique_ptr<DataGenerator> Gen(
+    std::vector<GeneratorField> column_gens);
+// For generating batches with 0 columns (though they can still have length)
+ARROW_TESTING_EXPORT std::unique_ptr<DataGenerator> EmptyGen();
+
+ARROW_TESTING_EXPORT std::unique_ptr<GTestDataGenerator> TestGen(
+    std::vector<std::shared_ptr<ArrayGenerator>> column_gens);
+ARROW_TESTING_EXPORT std::unique_ptr<GTestDataGenerator> TestGen(
+    std::vector<GeneratorField> column_gens);
+// For generating batches with 0 columns (though they can still have length)
+ARROW_TESTING_EXPORT std::unique_ptr<GTestDataGenerator> EmptyTestGen();

Review Comment:
   possibly instead of duplicate methods, a method (or instance method of DataGenerator) could convert one to the other?



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -54,5 +59,112 @@ void AccumulationQueue::Clear() {
 }
 
 ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
+
+namespace {
+
+struct LowestBatchIndexAtTop {
+  bool operator()(const ExecBatch& left, const ExecBatch& right) const {
+    return left.index > right.index;
+  }
+};
+
+class SequencingQueueImpl : public SequencingQueue {
+ public:
+  explicit SequencingQueueImpl(Processor* processor) : processor_(processor) {}
+
+  Status InsertBatch(ExecBatch batch) override {
+    std::unique_lock lk(mutex_);
+    if (batch.index == next_index_) {
+      return DeliverNextUnlocked(std::move(batch), std::move(lk));
+    }
+    queue_.emplace(std::move(batch));
+    return Status::OK();
+  }
+
+ private:
+  Status DeliverNextUnlocked(ExecBatch batch, std::unique_lock<std::mutex>&& lk) {
+    // Should be able to detect and avoid this at plan construction
+    DCHECK_NE(batch.index, ::arrow::compute::kUnsequencedIndex)
+        << "attempt to use a sequencing queue on an unsequenced stream of batches";
+    std::vector<Task> tasks;
+    next_index_++;
+    ARROW_ASSIGN_OR_RAISE(std::optional<Task> this_task,
+                          processor_->Process(std::move(batch)));
+    while (!queue_.empty() && next_index_ == queue_.top().index) {
+      ARROW_ASSIGN_OR_RAISE(std::optional<Task> task, processor_->Process(queue_.top()));
+      if (task) {
+        tasks.push_back(std::move(*task));
+      }
+      queue_.pop();
+      next_index_++;
+    }
+    lk.unlock();
+    // Schedule tasks for stale items
+    for (auto& task : tasks) {
+      processor_->Schedule(std::move(task));
+    }
+    // Run the current item immediately
+    if (this_task) {
+      ARROW_RETURN_NOT_OK(std::move(*this_task)());
+    }
+    return Status::OK();
+  }
+
+  Processor* processor_;
+
+  std::priority_queue<ExecBatch, std::vector<ExecBatch>, LowestBatchIndexAtTop> queue_;
+  int next_index_ = 0;
+  std::mutex mutex_;
+};
+
+class SerialSequencingQueueImpl : public SerialSequencingQueue {
+ public:
+  explicit SerialSequencingQueueImpl(Processor* processor) : processor_(processor) {}
+
+  Status InsertBatch(ExecBatch batch) override {
+    std::unique_lock lk(mutex_);
+    queue_.push(std::move(batch));
+    if (queue_.top().index == next_index_ && !is_processing_) {
+      is_processing_ = true;
+      return DoProcess(std::move(lk));
+    }
+    return Status::OK();
+  }
+
+ private:
+  Status DoProcess(std::unique_lock<std::mutex>&& lk) {
+    while (!queue_.empty() && queue_.top().index == next_index_) {
+      ExecBatch next(queue_.top());
+      queue_.pop();
+      next_index_++;
+      lk.unlock();
+      // If we bail here we don't hold the lock so that is ok.  is_processing_ will
+      // never switch to true so no other threads can process but that should be ok

Review Comment:
   never switch to false?



##########
cpp/src/arrow/compute/exec/fetch_node.cc:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sstream>
+
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/map_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/query_context.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/tracing_internal.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+namespace {
+
+class FetchCounter {
+ public:
+  struct Page {
+    int64_t to_skip;
+    int64_t to_send;
+    bool ended;
+  };
+
+  FetchCounter(int64_t rows_to_skip, int64_t rows_to_send)
+      : rows_to_send_(rows_to_send), rows_to_skip_(rows_to_skip) {}
+
+  Page NextPage(const ExecBatch& batch) {
+    int64_t rows_in_batch_to_skip = 0;
+    if (rows_to_skip_ > 0) {
+      rows_in_batch_to_skip = std::min(rows_to_skip_, batch.length);
+      rows_to_skip_ -= rows_in_batch_to_skip;
+    }
+
+    int64_t rows_in_batch_to_send = 0;
+    if (rows_to_send_ > 0) {

Review Comment:
   hmm, don't we only want to send rows if remaining rows_to_skip_ == 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org