You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/01 19:57:03 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #10397: ARROW-11930: [C++][Dataset][Compute] Use an ExecPlan for dataset scans

westonpace commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r643353334



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -225,22 +212,43 @@ class ARROW_EXPORT ExecNode {
   virtual void StopProducing() = 0;
 
  protected:
-  ExecNode(ExecPlan* plan, std::string label, std::vector<BatchDescr> input_descrs,
+  ExecNode(ExecPlan*, std::string label, NodeVector inputs,

Review comment:
       Nit: Seems odd to have no name for `ExecPlan` but then have a name for everything else.

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -61,13 +61,22 @@ Expression call(std::string function, std::vector<Expression> arguments,
   call.function_name = std::move(function);
   call.arguments = std::move(arguments);
   call.options = std::move(options);
+
+  call.hash = std::hash<std::string>{}(call.function_name);
+  for (const auto& arg : call.arguments) {
+    call.hash ^= arg.hash();
+  }
   return Expression(std::move(call));
 }
 
 const Datum* Expression::literal() const { return util::get_if<Datum>(impl_.get()); }
 
+const Expression::Parameter* Expression::parameter() const {

Review comment:
       Checking my knowledge.  A parameter is an index + an expected type?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -613,6 +639,22 @@ std::vector<FieldRef> FieldsInExpression(const Expression& expr) {
   return fields;
 }
 
+std::vector<int> ParametersInExpression(const Expression& expr) {

Review comment:
       It seems this could return duplicate indices (e.g. in something like `x < 5 && x > 0`).  Is that a problem?

##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -204,5 +204,35 @@ arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
   return internal::checked_pointer_cast<T>(source);
 }
 
+class FragmentDataset : public Dataset {

Review comment:
       Should this be a base type of `InMemoryDataset`?

##########
File path: cpp/src/arrow/compute/exec/test_util.cc
##########
@@ -124,277 +130,42 @@ struct DummyNode : ExecNode {
   bool started_ = false;
 };
 
-struct RecordBatchReaderNode : ExecNode {
-  RecordBatchReaderNode(ExecPlan* plan, std::string label,
-                        std::shared_ptr<RecordBatchReader> reader, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {},
-                 DescrFromSchemaColumns(*reader->schema()), /*num_outputs=*/1),
-        schema_(reader->schema()),
-        reader_(std::move(reader)),
-        io_executor_(io_executor) {}
-
-  RecordBatchReaderNode(ExecPlan* plan, std::string label, std::shared_ptr<Schema> schema,
-                        RecordBatchGenerator generator, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {}, DescrFromSchemaColumns(*schema),
-                 /*num_outputs=*/1),
-        schema_(std::move(schema)),
-        generator_(std::move(generator)),
-        io_executor_(io_executor) {}
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) override {}
-
-  void ErrorReceived(ExecNode* input, Status error) override {}
-
-  void InputFinished(ExecNode* input, int seq_stop) override {}
-
-  Status StartProducing() override {
-    next_batch_index_ = 0;
-    if (!generator_) {
-      auto it = MakeIteratorFromReader(reader_);
-      ARROW_ASSIGN_OR_RAISE(generator_,
-                            MakeBackgroundGenerator(std::move(it), io_executor_));
-    }
-    GenerateOne(std::unique_lock<std::mutex>{mutex_});
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output) override {}
-
-  void ResumeProducing(ExecNode* output) override {}
-
-  void StopProducing(ExecNode* output) override {
-    ASSERT_EQ(output, outputs_[0]);
-    std::unique_lock<std::mutex> lock(mutex_);
-    generator_ = nullptr;  // null function
-  }
-
-  void StopProducing() override { StopProducing(outputs_[0]); }
-
- private:
-  void GenerateOne(std::unique_lock<std::mutex>&& lock) {
-    if (!generator_) {
-      // Stopped
-      return;
-    }
-    auto plan = this->plan()->shared_from_this();
-    auto fut = generator_();
-    const auto batch_index = next_batch_index_++;
-
-    lock.unlock();
-    // TODO we want to transfer always here
-    io_executor_->Transfer(std::move(fut))
-        .AddCallback(
-            [plan, batch_index, this](const Result<std::shared_ptr<RecordBatch>>& res) {
-              std::unique_lock<std::mutex> lock(mutex_);
-              if (!res.ok()) {
-                for (auto out : outputs_) {
-                  out->ErrorReceived(this, res.status());
-                }
-                return;
-              }
-              const auto& batch = *res;
-              if (IsIterationEnd(batch)) {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputFinished(this, batch_index);
-                }
-              } else {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputReceived(this, batch_index, compute::ExecBatch(*batch));
-                }
-                lock.lock();
-                GenerateOne(std::move(lock));
-              }
-            });
-  }
-
-  std::mutex mutex_;
-  const std::shared_ptr<Schema> schema_;
-  const std::shared_ptr<RecordBatchReader> reader_;
-  RecordBatchGenerator generator_;
-  int next_batch_index_;
-
-  Executor* const io_executor_;
-};
-
-struct RecordBatchCollectNodeImpl : public RecordBatchCollectNode {
-  RecordBatchCollectNodeImpl(ExecPlan* plan, std::string label,
-                             std::shared_ptr<Schema> schema)
-      : RecordBatchCollectNode(plan, std::move(label), {DescrFromSchemaColumns(*schema)},
-                               {"batches_to_collect"}, {}, 0),
-        schema_(std::move(schema)) {}
-
-  RecordBatchGenerator generator() override { return generator_; }
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  Status StartProducing() override {
-    num_received_ = 0;
-    num_emitted_ = 0;
-    emit_stop_ = -1;
-    stopped_ = false;
-    producer_.emplace(generator_.producer());
-    return Status::OK();
-  }
-
-  // sink nodes have no outputs from which to feel backpressure
-  void ResumeProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void PauseProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void StopProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-
-  void StopProducing() override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputReceived(ExecNode* input, int seq_num,
-                     compute::ExecBatch exec_batch) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (stopped_) {
-      return;
-    }
-    auto maybe_batch = MakeBatch(std::move(exec_batch));
-    if (!maybe_batch.ok()) {
-      lock.unlock();
-      producer_->Push(std::move(maybe_batch));
-      return;
-    }
-
-    // TODO would be nice to factor this out in a ReorderQueue
-    auto batch = *std::move(maybe_batch);
-    if (seq_num <= static_cast<int>(received_batches_.size())) {
-      received_batches_.resize(seq_num + 1, nullptr);
-    }
-    DCHECK_EQ(received_batches_[seq_num], nullptr);
-    received_batches_[seq_num] = std::move(batch);
-    ++num_received_;
-
-    if (seq_num != num_emitted_) {
-      // Cannot emit yet as there is a hole at `num_emitted_`
-      DCHECK_GT(seq_num, num_emitted_);
-      DCHECK_EQ(received_batches_[num_emitted_], nullptr);
-      return;
-    }
-    if (num_received_ == emit_stop_) {
-      StopProducingUnlocked();
-    }
-
-    // Emit batches in order as far as possible
-    // First collect these batches, then unlock before producing.
-    const auto seq_start = seq_num;
-    while (seq_num < static_cast<int>(received_batches_.size()) &&
-           received_batches_[seq_num] != nullptr) {
-      ++seq_num;
-    }
-    DCHECK_GT(seq_num, seq_start);
-    // By moving the values now, we make sure another thread won't emit the same values
-    // below
-    RecordBatchVector to_emit(
-        std::make_move_iterator(received_batches_.begin() + seq_start),
-        std::make_move_iterator(received_batches_.begin() + seq_num));
-
-    lock.unlock();
-    for (auto&& batch : to_emit) {
-      producer_->Push(std::move(batch));
-    }
-    lock.lock();
-
-    DCHECK_EQ(seq_start, num_emitted_);  // num_emitted_ wasn't bumped in the meantime
-    num_emitted_ = seq_num;
-  }
-
-  void ErrorReceived(ExecNode* input, Status error) override {
-    // XXX do we care about properly sequencing the error?
-    producer_->Push(std::move(error));
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputFinished(ExecNode* input, int seq_stop) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    DCHECK_GE(seq_stop, static_cast<int>(received_batches_.size()));
-    received_batches_.reserve(seq_stop);
-    emit_stop_ = seq_stop;
-    if (emit_stop_ == num_received_) {
-      DCHECK_EQ(emit_stop_, num_emitted_);
-      StopProducingUnlocked();
-    }
-  }
-
- private:
-  void StopProducingUnlocked() {
-    if (!stopped_) {
-      stopped_ = true;
-      producer_->Close();
-      inputs_[0]->StopProducing(this);
-    }
-  }
-
-  // TODO factor this out as ExecBatch::ToRecordBatch()?
-  Result<std::shared_ptr<RecordBatch>> MakeBatch(compute::ExecBatch&& exec_batch) {
-    ArrayDataVector columns;
-    columns.reserve(exec_batch.values.size());
-    for (auto&& value : exec_batch.values) {
-      if (!value.is_array()) {
-        return Status::TypeError("Expected array input");
-      }
-      columns.push_back(std::move(value).array());
-    }
-    return RecordBatch::Make(schema_, exec_batch.length, std::move(columns));
-  }
-
-  const std::shared_ptr<Schema> schema_;
-
-  std::mutex mutex_;
-  RecordBatchVector received_batches_;
-  int num_received_;
-  int num_emitted_;
-  int emit_stop_;
-  bool stopped_;
-
-  PushGenerator<std::shared_ptr<RecordBatch>> generator_;
-  util::optional<PushGenerator<std::shared_ptr<RecordBatch>>::Producer> producer_;
-};
+AsyncGenerator<util::optional<ExecBatch>> Wrap(RecordBatchGenerator gen,
+                                               ::arrow::internal::Executor* io_executor) {
+  return MakeMappedGenerator(
+      MakeTransferredGenerator(std::move(gen), io_executor),
+      [](const std::shared_ptr<RecordBatch>& batch) -> util::optional<ExecBatch> {
+        return ExecBatch(*batch);
+      });
+}
 
 }  // namespace
 
 ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label,
-                                    std::shared_ptr<RecordBatchReader> reader,
-                                    Executor* io_executor) {
-  return plan->EmplaceNode<RecordBatchReaderNode>(plan, std::move(label),
-                                                  std::move(reader), io_executor);
+                                    const std::shared_ptr<Schema>& schema,
+                                    RecordBatchGenerator generator,
+                                    ::arrow::internal::Executor* io_executor) {
+  return MakeSourceNode(plan, std::move(label), DescrFromSchemaColumns(*schema),
+                        Wrap(std::move(generator), io_executor));
 }
 
 ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label,
-                                    std::shared_ptr<Schema> schema,
-                                    RecordBatchGenerator generator,
-                                    ::arrow::internal::Executor* io_executor) {
-  return plan->EmplaceNode<RecordBatchReaderNode>(
-      plan, std::move(label), std::move(schema), std::move(generator), io_executor);
+                                    const std::shared_ptr<RecordBatchReader>& reader,
+                                    Executor* io_executor) {
+  auto gen =
+      MakeBackgroundGenerator(MakeIteratorFromReader(reader), io_executor).ValueOrDie();

Review comment:
       Same question, why `io_executor`?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -510,7 +475,67 @@ Result<Expression> Expression::Bind(const Schema& in_schema,
   return Bind(ValueDescr::Array(struct_(in_schema.fields())), exec_context);
 }
 
-Result<Datum> ExecuteScalarExpression(const Expression& expr, const Datum& input,
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial) {
+  ExecBatch out;
+
+  if (partial.kind() == Datum::RECORD_BATCH) {
+    const auto& partial_batch = *partial.record_batch();
+    out.length = partial_batch.num_rows();
+
+    for (const auto& field : full_schema.fields()) {
+      ARROW_ASSIGN_OR_RAISE(auto column,
+                            FieldRef(field->name()).GetOneOrNone(partial_batch));
+
+      if (column) {
+        if (!column->type()->Equals(field->type())) {
+          // Referenced field was present but didn't have the expected type.
+          // This *should* be handled by readers, and will just be an error in the future.
+          ARROW_ASSIGN_OR_RAISE(
+              auto converted,
+              compute::Cast(column, field->type(), compute::CastOptions::Safe()));
+          column = converted.make_array();
+        }
+        out.values.emplace_back(std::move(column));
+      } else {
+        out.values.emplace_back(MakeNullScalar(field->type()));
+      }
+    }
+    return out;
+  }
+
+  // wasteful but useful for testing:
+  if (partial.type()->id() == Type::STRUCT) {
+    if (partial.is_array()) {
+      ARROW_ASSIGN_OR_RAISE(auto partial_batch,
+                            RecordBatch::FromStructArray(partial.make_array()));
+
+      return MakeExecBatch(full_schema, partial_batch);
+    }
+
+    if (partial.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(auto partial_array,
+                            MakeArrayFromScalar(*partial.scalar(), 1));
+      ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array));
+
+      for (Datum& value : out.values) {
+        if (value.is_scalar()) continue;
+        ARROW_ASSIGN_OR_RAISE(value, value.make_array()->GetScalar(0));
+      }

Review comment:
       I'm not sure what is going on here (though that is likely my own problem).  If the value is a scalar record batch you want to end up with one each value being a scalar.  Can you not just grab the first item from each column of `partial_array`?  Why do you need to go back in and patch things up?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -510,7 +475,67 @@ Result<Expression> Expression::Bind(const Schema& in_schema,
   return Bind(ValueDescr::Array(struct_(in_schema.fields())), exec_context);
 }
 
-Result<Datum> ExecuteScalarExpression(const Expression& expr, const Datum& input,
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial) {
+  ExecBatch out;
+
+  if (partial.kind() == Datum::RECORD_BATCH) {
+    const auto& partial_batch = *partial.record_batch();
+    out.length = partial_batch.num_rows();
+
+    for (const auto& field : full_schema.fields()) {
+      ARROW_ASSIGN_OR_RAISE(auto column,
+                            FieldRef(field->name()).GetOneOrNone(partial_batch));

Review comment:
       Will this fail if the column names are not unique?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -258,43 +258,27 @@ class MappingGenerator {
 /// Note: Errors returned from the `map` function will be propagated
 ///
 /// If the source generator is async-reentrant then this generator will be also
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<Result<V>(const T&)> map) {
-  std::function<Future<V>(const T&)> future_map = [map](const T& val) -> Future<V> {
-    return Future<V>::MakeFinished(map(val));
-  };
-  return MappingGenerator<T, V>(std::move(source_generator), std::move(future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<V(const T&)> map) {
-  std::function<Future<V>(const T&)> maybe_future_map = [map](const T& val) -> Future<V> {
-    return Future<V>::MakeFinished(map(val));
-  };
-  return MappingGenerator<T, V>(std::move(source_generator), std::move(maybe_future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<Future<V>(const T&)> map) {
-  return MappingGenerator<T, V>(std::move(source_generator), std::move(map));
-}
-
-template <typename V, typename T, typename MapFunc>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFunc map) {
+template <typename T, typename MapFn,

Review comment:
       Thanks for figuring this out!

##########
File path: cpp/src/arrow/compute/exec/expression.h
##########
@@ -207,11 +218,22 @@ Result<Expression> SimplifyWithGuarantee(Expression,
 
 // Execution
 
-/// Execute a scalar expression against the provided state and input Datum. This
+/// Ensure that a RecordBatch (which may have missing or incorrectly ordered columns)

Review comment:
       This reads like an explanatory NB more than a description of what the function does.  Maybe...
   
   ```
   Converts a RecordBatch to an ExecBatch
   
   Arrays will be reordered according to schema ordering.  Missing fields will be replaced with null scalars.  This is necessary when executing expressions since we look up fields by index.
   ```
   
   Also, it feels a bit like a catch-22, though this is more of an observation than a complaint.  "We need to change expressions to use indices so they will work on exec batches." and "We need to convert record batches to exec batches because expressions work on indices."
   
   Maybe just remove the `This is necessary...` statement.  By this point the user is already making an exec batch so presumably they have a reason for it.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -225,22 +212,43 @@ class ARROW_EXPORT ExecNode {
   virtual void StopProducing() = 0;
 
  protected:
-  ExecNode(ExecPlan* plan, std::string label, std::vector<BatchDescr> input_descrs,
+  ExecNode(ExecPlan*, std::string label, NodeVector inputs,
            std::vector<std::string> input_labels, BatchDescr output_descr,
            int num_outputs);
 
   ExecPlan* plan_;
-
   std::string label_;
 
-  std::vector<BatchDescr> input_descrs_;
-  std::vector<std::string> input_labels_;
   NodeVector inputs_;
+  std::vector<std::string> input_labels_;
 
   BatchDescr output_descr_;
   int num_outputs_;
   NodeVector outputs_;
 };
 
+/// \brief Adapt an AsyncGenerator<ExecBatch> as a source node
+ARROW_EXPORT
+ExecNode* MakeSourceNode(ExecPlan*, std::string label, ExecNode::BatchDescr output_descr,
+                         AsyncGenerator<util::optional<ExecBatch>>);
+
+/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
+ARROW_EXPORT
+AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input,
+                                                       std::string label);
+
+/// \brief Make a node which excludes some rows from batches passed through it
+///
+/// filter Expression must be bound; no field references will be looked up by name
+ARROW_EXPORT
+ExecNode* MakeFilterNode(ExecNode* input, std::string label, Expression filter);
+
+/// \brief Make a node which executes expressions on input batches, producing new batches.
+///
+/// Expressions must be bound; no field references will be looked up by name
+ARROW_EXPORT
+ExecNode* MakeProjectNode(ExecNode* input, std::string label,

Review comment:
       I maybe understand from the meaning of `Project` what is going on here but if someone were not as familiar with the domain they might not.  It's not clear that the output will have one output column per expression.  From the description this sounds like it might be more of a generic "map" operation that maps input batch to output batch.

##########
File path: cpp/src/arrow/compute/exec/expression_test.cc
##########
@@ -165,6 +165,56 @@ TEST(ExpressionUtils, StripOrderPreservingCasts) {
   Expect(cast(field_ref("i32"), uint64()), no_change);
 }
 
+TEST(ExpressionUtils, MakeExecBatch) {
+  auto Expect = [](std::shared_ptr<RecordBatch> partial_batch) {
+    SCOPED_TRACE(partial_batch->ToString());
+    ASSERT_OK_AND_ASSIGN(auto batch, MakeExecBatch(*kBoringSchema, partial_batch));
+
+    ASSERT_EQ(batch.num_values(), kBoringSchema->num_fields());
+    for (int i = 0; i < kBoringSchema->num_fields(); ++i) {
+      const auto& field = *kBoringSchema->field(i);
+
+      SCOPED_TRACE("Field#" + std::to_string(i) + " " + field.ToString());
+
+      EXPECT_TRUE(batch[i].type()->Equals(field.type()))
+          << "Incorrect type " << batch[i].type()->ToString();
+
+      ASSERT_OK_AND_ASSIGN(auto col, FieldRef(field.name()).GetOneOrNone(*partial_batch));

Review comment:
       Why not `partial_batch->GetFieldByName(field.name())`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -66,6 +66,12 @@ bool IsIterationEnd(const T& val) {
   return IterationTraits<T>::IsEnd(val);
 }
 
+template <typename T>
+bool IsIterationEnd(const Result<T>& maybe_val) {

Review comment:
       Where is this used?

##########
File path: cpp/src/arrow/compute/exec/test_util.cc
##########
@@ -124,277 +130,42 @@ struct DummyNode : ExecNode {
   bool started_ = false;
 };
 
-struct RecordBatchReaderNode : ExecNode {
-  RecordBatchReaderNode(ExecPlan* plan, std::string label,
-                        std::shared_ptr<RecordBatchReader> reader, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {},
-                 DescrFromSchemaColumns(*reader->schema()), /*num_outputs=*/1),
-        schema_(reader->schema()),
-        reader_(std::move(reader)),
-        io_executor_(io_executor) {}
-
-  RecordBatchReaderNode(ExecPlan* plan, std::string label, std::shared_ptr<Schema> schema,
-                        RecordBatchGenerator generator, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {}, DescrFromSchemaColumns(*schema),
-                 /*num_outputs=*/1),
-        schema_(std::move(schema)),
-        generator_(std::move(generator)),
-        io_executor_(io_executor) {}
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) override {}
-
-  void ErrorReceived(ExecNode* input, Status error) override {}
-
-  void InputFinished(ExecNode* input, int seq_stop) override {}
-
-  Status StartProducing() override {
-    next_batch_index_ = 0;
-    if (!generator_) {
-      auto it = MakeIteratorFromReader(reader_);
-      ARROW_ASSIGN_OR_RAISE(generator_,
-                            MakeBackgroundGenerator(std::move(it), io_executor_));
-    }
-    GenerateOne(std::unique_lock<std::mutex>{mutex_});
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output) override {}
-
-  void ResumeProducing(ExecNode* output) override {}
-
-  void StopProducing(ExecNode* output) override {
-    ASSERT_EQ(output, outputs_[0]);
-    std::unique_lock<std::mutex> lock(mutex_);
-    generator_ = nullptr;  // null function
-  }
-
-  void StopProducing() override { StopProducing(outputs_[0]); }
-
- private:
-  void GenerateOne(std::unique_lock<std::mutex>&& lock) {
-    if (!generator_) {
-      // Stopped
-      return;
-    }
-    auto plan = this->plan()->shared_from_this();
-    auto fut = generator_();
-    const auto batch_index = next_batch_index_++;
-
-    lock.unlock();
-    // TODO we want to transfer always here
-    io_executor_->Transfer(std::move(fut))
-        .AddCallback(
-            [plan, batch_index, this](const Result<std::shared_ptr<RecordBatch>>& res) {
-              std::unique_lock<std::mutex> lock(mutex_);
-              if (!res.ok()) {
-                for (auto out : outputs_) {
-                  out->ErrorReceived(this, res.status());
-                }
-                return;
-              }
-              const auto& batch = *res;
-              if (IsIterationEnd(batch)) {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputFinished(this, batch_index);
-                }
-              } else {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputReceived(this, batch_index, compute::ExecBatch(*batch));
-                }
-                lock.lock();
-                GenerateOne(std::move(lock));
-              }
-            });
-  }
-
-  std::mutex mutex_;
-  const std::shared_ptr<Schema> schema_;
-  const std::shared_ptr<RecordBatchReader> reader_;
-  RecordBatchGenerator generator_;
-  int next_batch_index_;
-
-  Executor* const io_executor_;
-};
-
-struct RecordBatchCollectNodeImpl : public RecordBatchCollectNode {
-  RecordBatchCollectNodeImpl(ExecPlan* plan, std::string label,
-                             std::shared_ptr<Schema> schema)
-      : RecordBatchCollectNode(plan, std::move(label), {DescrFromSchemaColumns(*schema)},
-                               {"batches_to_collect"}, {}, 0),
-        schema_(std::move(schema)) {}
-
-  RecordBatchGenerator generator() override { return generator_; }
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  Status StartProducing() override {
-    num_received_ = 0;
-    num_emitted_ = 0;
-    emit_stop_ = -1;
-    stopped_ = false;
-    producer_.emplace(generator_.producer());
-    return Status::OK();
-  }
-
-  // sink nodes have no outputs from which to feel backpressure
-  void ResumeProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void PauseProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void StopProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-
-  void StopProducing() override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputReceived(ExecNode* input, int seq_num,
-                     compute::ExecBatch exec_batch) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (stopped_) {
-      return;
-    }
-    auto maybe_batch = MakeBatch(std::move(exec_batch));
-    if (!maybe_batch.ok()) {
-      lock.unlock();
-      producer_->Push(std::move(maybe_batch));
-      return;
-    }
-
-    // TODO would be nice to factor this out in a ReorderQueue
-    auto batch = *std::move(maybe_batch);
-    if (seq_num <= static_cast<int>(received_batches_.size())) {
-      received_batches_.resize(seq_num + 1, nullptr);
-    }
-    DCHECK_EQ(received_batches_[seq_num], nullptr);
-    received_batches_[seq_num] = std::move(batch);
-    ++num_received_;
-
-    if (seq_num != num_emitted_) {
-      // Cannot emit yet as there is a hole at `num_emitted_`
-      DCHECK_GT(seq_num, num_emitted_);
-      DCHECK_EQ(received_batches_[num_emitted_], nullptr);
-      return;
-    }
-    if (num_received_ == emit_stop_) {
-      StopProducingUnlocked();
-    }
-
-    // Emit batches in order as far as possible
-    // First collect these batches, then unlock before producing.
-    const auto seq_start = seq_num;
-    while (seq_num < static_cast<int>(received_batches_.size()) &&
-           received_batches_[seq_num] != nullptr) {
-      ++seq_num;
-    }
-    DCHECK_GT(seq_num, seq_start);
-    // By moving the values now, we make sure another thread won't emit the same values
-    // below
-    RecordBatchVector to_emit(
-        std::make_move_iterator(received_batches_.begin() + seq_start),
-        std::make_move_iterator(received_batches_.begin() + seq_num));
-
-    lock.unlock();
-    for (auto&& batch : to_emit) {
-      producer_->Push(std::move(batch));
-    }
-    lock.lock();
-
-    DCHECK_EQ(seq_start, num_emitted_);  // num_emitted_ wasn't bumped in the meantime
-    num_emitted_ = seq_num;
-  }
-
-  void ErrorReceived(ExecNode* input, Status error) override {
-    // XXX do we care about properly sequencing the error?
-    producer_->Push(std::move(error));
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputFinished(ExecNode* input, int seq_stop) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    DCHECK_GE(seq_stop, static_cast<int>(received_batches_.size()));
-    received_batches_.reserve(seq_stop);
-    emit_stop_ = seq_stop;
-    if (emit_stop_ == num_received_) {
-      DCHECK_EQ(emit_stop_, num_emitted_);
-      StopProducingUnlocked();
-    }
-  }
-
- private:
-  void StopProducingUnlocked() {
-    if (!stopped_) {
-      stopped_ = true;
-      producer_->Close();
-      inputs_[0]->StopProducing(this);
-    }
-  }
-
-  // TODO factor this out as ExecBatch::ToRecordBatch()?
-  Result<std::shared_ptr<RecordBatch>> MakeBatch(compute::ExecBatch&& exec_batch) {
-    ArrayDataVector columns;
-    columns.reserve(exec_batch.values.size());
-    for (auto&& value : exec_batch.values) {
-      if (!value.is_array()) {
-        return Status::TypeError("Expected array input");
-      }
-      columns.push_back(std::move(value).array());
-    }
-    return RecordBatch::Make(schema_, exec_batch.length, std::move(columns));
-  }
-
-  const std::shared_ptr<Schema> schema_;
-
-  std::mutex mutex_;
-  RecordBatchVector received_batches_;
-  int num_received_;
-  int num_emitted_;
-  int emit_stop_;
-  bool stopped_;
-
-  PushGenerator<std::shared_ptr<RecordBatch>> generator_;
-  util::optional<PushGenerator<std::shared_ptr<RecordBatch>>::Producer> producer_;
-};
+AsyncGenerator<util::optional<ExecBatch>> Wrap(RecordBatchGenerator gen,
+                                               ::arrow::internal::Executor* io_executor) {
+  return MakeMappedGenerator(
+      MakeTransferredGenerator(std::move(gen), io_executor),

Review comment:
       Why are you transferring onto the `io_executor` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org