You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 15:28:11 UTC

[GitHub] [arrow] bkietz opened a new pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

bkietz opened a new pull request #10664:
URL: https://github.com/apache/arrow/pull/10664


   Replaces the body of AsyncScanner::ScanBatchesAsync with usage of an ExecPlan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874888474


   Benchmark runs are scheduled for baseline = 780e95c512d63bbea1e040af0eb44a0bf63c4d72 and contender = c1808f9156bd4df03cf8338217ffd93f519abb96. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Scheduled] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/399dd7ae47d345969dfba70ba5d60fae...153e1fbed2d04daebb6acdf1c2389903/)
   [Scheduled] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/eef93c4bfb1d4a8b92351f41883dfd55...6f6c2fcd5d0046bd8e70799566145aaa/)
   [Skipped :warning: Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/e5af519d38bd4cc483cd45f86750d807...12bcd567e1074dcbb6aa56f22219c4de/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874887902


   @ursabot please benchmark lang=Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664773055



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -729,20 +792,72 @@ Future<std::shared_ptr<Table>> AsyncScanner::ToTableAsync(
   });
 }
 
+namespace {
+Result<int64_t> GetSelectionSize(const Datum& selection, int64_t length) {
+  if (length == 0) return 0;
+
+  if (selection.is_scalar()) {
+    if (!selection.scalar()->is_valid) return 0;
+    if (!selection.scalar_as<BooleanScalar>().value) return 0;
+    return length;
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto count, compute::Sum(selection));
+  return static_cast<int64_t>(count.scalar_as<UInt64Scalar>().value);
+}
+}  // namespace
+
 Result<int64_t> AsyncScanner::CountRows() {
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  ARROW_ASSIGN_OR_RAISE(auto count_gen_gen,
-                        FragmentsToRowCount(std::move(fragment_gen), scan_options_));
-  auto count_gen = MakeConcatenatedGenerator(std::move(count_gen_gen));
-  int64_t total = 0;
-  auto sum_fn = [&total](util::optional<int64_t> count) -> Status {
-    if (count.has_value()) total += *count;
-    return Status::OK();
-  };
-  RETURN_NOT_OK(VisitAsyncGenerator<util::optional<int64_t>>(std::move(count_gen),
-                                                             std::move(sum_fn))
-                    .status());
-  return total;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+
+  std::atomic<int64_t> total{0};
+
+  fragment_gen = MakeMappedGenerator(
+      std::move(fragment_gen), [&](const std::shared_ptr<Fragment>& fragment) {
+        return fragment->CountRows(scan_options_->filter, scan_options_)
+            .Then([&, fragment](util::optional<int64_t> fast_count) mutable
+                  -> std::shared_ptr<Fragment> {
+              if (fast_count) {
+                // fast path: got row count directly; skip scanning this fragment
+                total += *fast_count;
+                return std::make_shared<OneShotFragment>(
+                    scan_options_->dataset_schema,
+                    MakeEmptyIterator<std::shared_ptr<RecordBatch>>());
+              }
+
+              // slow path: actually filter this fragment's batches
+              return std::move(fragment);
+            });
+      });
+
+  ARROW_ASSIGN_OR_RAISE(auto scan,
+                        MakeScanNode(plan.get(), std::move(fragment_gen), scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto get_selection,
+      compute::MakeProjectNode(scan, "get_selection", {scan_options_->filter}));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(get_selection, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  RETURN_NOT_OK(
+      VisitAsyncGenerator(std::move(sink_gen),
+                          [&](const util::optional<compute::ExecBatch>& batch) {
+                            // TODO replace with scalar aggregation node
+                            ARROW_ASSIGN_OR_RAISE(
+                                int64_t slow_count,
+                                GetSelectionSize(batch->values[0], batch->length));
+                            total += slow_count;
+                            return Status::OK();
+                          })
+          .status());
+
+  plan->finished().Wait();

Review comment:
       Should we raise errors encountered here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r665758000



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       There's an issue (ARROW-12938) for letting a StopToken run a callback since it's needed to support other things like gRPC (which also exposes a cancel method instead taking a cancellable context). But that'd be more work and requires dealing with signal safety in the callback.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666201112



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer if not.
-                // This can happen for in-memory scans where batches didn't require
-                // any CPU work to decode. Otherwise, parsing etc should have already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {

Review comment:
       Is there a reason not to use Loop, though?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874968900


   @ursabot please benchmark lang=Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664822735



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -506,73 +444,116 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
         return EnumeratedRecordBatch{record_batch, fragment};
       };
 
-  auto combined_gen = MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
-
-  if (filter_and_project) {
-    return FilterAndProjectRecordBatchAsync(options, std::move(combined_gen));
-  }
-  return combined_gen;
+  return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
 }
 
 Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
-    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options,
-    bool filter_and_project = true) {
+    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
   auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
   return MakeMappedGenerator(std::move(enumerated_fragment_gen),
                              [=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
-                               return FragmentToBatches(fragment, options,
-                                                        filter_and_project);
+                               return FragmentToBatches(fragment, options);
                              });
 }
 
-Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>> FragmentsToRowCount(
-    FragmentGenerator fragment_gen,
-    std::shared_ptr<ScanOptions> options_with_projection) {
-  // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
-  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
 
-  // Drop projection since we only need to count rows
-  auto options = std::make_shared<ScanOptions>(*options_with_projection);
-  RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));
-
-  auto count_fragment_fn =
-      [options](const Enumerated<std::shared_ptr<Fragment>>& fragment)
-      -> Result<AsyncGenerator<util::optional<int64_t>>> {
-    auto count_fut = fragment.value->CountRows(options->filter, options);
-    return MakeFromFuture(
-        count_fut.Then([=](util::optional<int64_t> val)
-                           -> Result<AsyncGenerator<util::optional<int64_t>>> {
-          // Fast path
-          if (val.has_value()) {
-            return MakeSingleFutureGenerator(
-                Future<util::optional<int64_t>>::MakeFinished(val));
-          }
-          // Slow path
-          ARROW_ASSIGN_OR_RAISE(auto batch_gen, FragmentToBatches(fragment, options));
-          auto count_fn =
-              [](const EnumeratedRecordBatch& enumerated) -> util::optional<int64_t> {
-            return enumerated.record_batch.value->num_rows();
-          };
-          return MakeMappedGenerator(batch_gen, std::move(count_fn));
-        }));
-  };
-  return MakeMappedGenerator(std::move(enumerated_fragment_gen),
-                             std::move(count_fragment_fn));
-}
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(std::move(fragment_gen), options));
 
-Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsyncImpl(
-    const std::shared_ptr<ScanOptions>& options, FragmentGenerator fragment_gen,
-    internal::Executor* cpu_executor, bool filter_and_project = true) {
-  ARROW_ASSIGN_OR_RAISE(
-      auto batch_gen_gen,
-      FragmentsToBatches(std::move(fragment_gen), options, filter_and_project));
-  auto batch_gen_gen_readahead =
-      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);

Review comment:
       Ben and I spoke about this.  The serial readahead wasn't actually adding anything (batch_gen_gen is really just listing fragments) and it was invalid because a serial readahead generator is not async reentrant and should not be consumed by MakeMergedGenerator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874860071


   https://issues.apache.org/jira/browse/ARROW-13238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r665732675



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -65,16 +61,24 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
 
   Status Validate();
 
-  /// Start producing on all nodes
+  /// \brief Start producing on all nodes
   ///
   /// Nodes are started in reverse topological order, such that any node
   /// is started before all of its inputs.
   Status StartProducing();
 
+  /// \brief Stop producing on all nodes
+  ///
+  /// Nodes are stopped topological order, such that any node

Review comment:
       **in** topological order

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -234,7 +241,7 @@ Result<std::vector<ExecBatch>> StartAndCollect(
   auto maybe_collected = CollectAsyncGenerator(gen).result();
   ARROW_ASSIGN_OR_RAISE(auto collected, maybe_collected);
 
-  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();
+  ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", "process2",

Review comment:
       Do you need the same comment as above?  Is "source2", "source1", ... valid too?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1102,7 +1102,7 @@ static Result<std::vector<compute::ExecBatch>> StartAndCollect(
   auto maybe_collected = CollectAsyncGenerator(gen).result();
   ARROW_ASSIGN_OR_RAISE(auto collected, maybe_collected);
 
-  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,6 +272,8 @@ class ConcreteFutureImpl : public FutureImpl {
         return true;
       case ShouldSchedule::IfUnfinished:
         return !in_add_callback;
+      case ShouldSchedule::IfDifferentExecutor:
+        return !callback_record.options.executor->OwnsThisThread();

Review comment:
       Is this speculative?  Or is it actually needed?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer if not.
-                // This can happen for in-memory scans where batches didn't require
-                // any CPU work to decode. Otherwise, parsing etc should have already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {
+                  std::unique_lock<std::mutex> lock(mutex_);
+                  int seq = batch_count_++;
+                  if (stop_requested_) {
+                    return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+                  }
                   lock.unlock();
-                  // unless we were already finished, push the error to our output
-                  // XXX is this correct? Is it reasonable for a consumer to
-                  // ignore errors from a finished producer?
-                  outputs_[0]->ErrorReceived(this, error);
-                }
-                return Break(seq);
-              });
-        }).Then([&](int seq) {
-          /// XXX this is probably redundant: do we always call InputFinished after
-          /// ErrorReceived or will ErrorRecieved be sufficient?
-          outputs_[0]->InputFinished(this, seq);
-        });
+
+                  return generator_().Then(
+                      [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+                        std::unique_lock<std::mutex> lock(mutex_);
+                        if (!batch || stop_requested_) {

Review comment:
       Nit: but `IsIterationEnd(batch)` may capture the intent more.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -332,6 +339,38 @@ TEST(ExecPlanExecution, StressSourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, StressSourceSinkStopped) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = slow && !parallel ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK_AND_ASSIGN(auto source, MakeTestSourceNode(plan.get(), "source",
+                                                           random_data, parallel, slow));
+
+      auto sink_gen = MakeSinkNode(source, "sink");
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+
+      auto maybe_first_batch = sink_gen().result();
+
+      plan->StopProducing();
+      plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -485,35 +528,12 @@ class Future {
   ///
   /// In this example `fut` falls out of scope but is not destroyed because it holds a
   /// cyclic reference to itself through the callback.
-  template <typename OnComplete>
-  typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete,
-              CallbackOptions opts = CallbackOptions::Defaults()) const {
+  template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>>
+  void AddCallback(OnComplete on_complete,
+                   CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
-    struct Callback {
-      void operator()(const FutureImpl& impl) && {
-        std::move(on_complete)(*impl.CastResult<ValueType>());
-      }
-      OnComplete on_complete;
-    };
-    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
-  }
-
-  /// Overload for callbacks accepting a Status
-  template <typename OnComplete>
-  typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete,

Review comment:
       Thanks for getting rid of these repetitive overloads

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;

Review comment:
       Why wouldn't `ShouldSchedule::Always` work?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer if not.
-                // This can happen for in-memory scans where batches didn't require
-                // any CPU work to decode. Otherwise, parsing etc should have already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {

Review comment:
       Rather than use `Loop` directly it seems you could use `MakeTransferred` and `VisitAsyncGenerator`.  You might need to add `ErrorVisitor` and `stop_callback` support to `VisitAsyncGenerator` but then it would be more univerally supported.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -332,6 +339,38 @@ TEST(ExecPlanExecution, StressSourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, StressSourceSinkStopped) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = slow && !parallel ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK_AND_ASSIGN(auto source, MakeTestSourceNode(plan.get(), "source",
+                                                           random_data, parallel, slow));
+
+      auto sink_gen = MakeSinkNode(source, "sink");
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+
+      auto maybe_first_batch = sink_gen().result();

Review comment:
       ASSERT_FINISHES_OK_AND_ASSIGN

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{
+      nullptr, [plan, exec_context](...) {
+        bool not_finished_yet = plan->finished().TryAddCallback([&] {

Review comment:
       I don't know that `TryAddCallback` will prevent you from calling `StopProducing` twice if that is what you are trying to avoid here.  Can `StopProducing` be idempotent?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{

Review comment:
       I don't think this is the first time this has come up.  Maybe a helper function `AsyncGenerator<T> WithShutdown(AsyncGenerator<T>, Callback)`

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       Instead of exposing a `StopProducing` you might need to take in a stop token.  If you don't then how will you handle the following...
   
   A pyarrow user runs a to_table on some dataset.  First it has to do inspection, then it has to actually do the scan.  At any point the user might press Ctrl-C to cancel the thing.
   
   We can support it with stop token by setting the Ctrl-C stop token handler and then passing that stop token down to the inspection call and the scan call.
   
   As a minor benefit you can get rid of all the locks here because they are inside the stop token itself.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       There are some helpful macros for working with futures in `future_util.h`.  You might prefer `ASSERT_FINISHES_OK` so a bug doesn't hang indefinitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874969456


   Benchmark runs are scheduled for baseline = 780e95c512d63bbea1e040af0eb44a0bf63c4d72 and contender = dfb10c0bff4c5f29734abd5a029d0788900d3cd1. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/399dd7ae47d345969dfba70ba5d60fae...54f7624e8abb450e9d9562f9f390b162/)
   [Scheduled] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/eef93c4bfb1d4a8b92351f41883dfd55...7b8d37b925574f93b386a04c6753f0c1/)
   [Skipped :warning: Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/e5af519d38bd4cc483cd45f86750d807...604a6d14b8754e4e98372819b77a4c66/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r667117812



##########
File path: cpp/src/arrow/testing/matchers.h
##########
@@ -138,40 +179,62 @@ class StatusMatcher {
   }
 
  private:
-  static const Status& GetStatus(const Status& status) { return status; }
+  const StatusCode code_;
+  const util::optional<testing::Matcher<std::string>> message_matcher_;
+};
 
-  template <typename T>
-  static const Status& GetStatus(const Result<T>& maybe_value) {
-    return maybe_value.status();
-  }
+class OkMatcher {
+ public:
+  template <typename Res>
+  operator testing::Matcher<Res>() const {  // NOLINT runtime/explicit
+    struct Impl : testing::MatcherInterface<const Res&> {
+      void DescribeTo(::std::ostream* os) const override { *os << "is ok"; }
 
-  template <typename T>
-  static const Status& GetStatus(const Future<T>& value_fut) {
-    return value_fut.status();
-  }
+      void DescribeNegationTo(::std::ostream* os) const override { *os << "is not ok"; }
 
-  const StatusCode code_;
-  const util::optional<testing::Matcher<std::string>> message_matcher_;
+      bool MatchAndExplain(const Res& maybe_value,
+                           testing::MatchResultListener* listener) const override {
+        const Status& status = internal::GenericToStatus(maybe_value);
+        testing::StringMatchResultListener value_listener;
+
+        const bool match = status.ok();
+        *listener << "whose value " << testing::PrintToString(status.ToString())
+                  << (match ? " matches" : " doesn't match");
+        testing::internal::PrintIfNotEmpty(value_listener.str(), listener->stream());
+        return match;
+      }
+    };
+
+    return testing::Matcher<Res>(new Impl());
+  }
 };
 
-// Returns a matcher that matches the value of a successful Result<T> or Future<T>.
-// (Future<T> will be waited upon to acquire its result for matching.)
+// Returns a matcher that waits on a Future (by default for 16 seconds)

Review comment:
       nit: kDefaultAssertFinishesWaitSeconds seconds?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -39,11 +39,13 @@ namespace compute {
 namespace {
 
 struct ExecPlanImpl : public ExecPlan {
-  ExecPlanImpl() = default;
+  explicit ExecPlanImpl(ExecContext* exec_context) : ExecPlan(exec_context) {}
 
   ~ExecPlanImpl() override {
-    if (started_ && !stopped_) {
+    if (started_ && !finished_.is_finished()) {

Review comment:
       This is slightly race-prone/this is a TOCTOU, right? Though the consequences aren't big here.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       Filed ARROW-13297.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";

Review comment:
       Should we not still return a Status? It might save us from odd bug reports if we error instead of letting internal state get trampled in case of a bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875091620


   Now that I'm running it again it just seems that count_rows with a filter got very slow.
   
   ```
   Before:
   Min: 24.285114823000185 s
   Median: 25.309564675000274 s
   Mean: 25.45627587750014 s
   Max: 26.564001424999333 s
   
   After:
   Min: 113.42360810700029 s
   Median: 117.11463808250028 s
   Mean: 118.39458996600005 s
   Max: 123.9104565839998 s
   ```
   
   <details>
   <summary>Test script</summary>
   
   ```python
   import statistics
   import time
   import pyarrow 
   import pyarrow.dataset
   import pyarrow.fs
   
   fs = pyarrow.fs.S3FileSystem(region="us-east-2")
   
   print("PyArrow:", pyarrow.__version__)
   
   times = []
   for _ in range(10):
       start = time.monotonic()
       ds = pyarrow.dataset.dataset([
           "ursa-labs-taxi-data/2009/01/data.parquet",
           "ursa-labs-taxi-data/2009/02/data.parquet",
           "ursa-labs-taxi-data/2009/03/data.parquet",
           "ursa-labs-taxi-data/2009/04/data.parquet",
           ],
           format="parquet",
           partitioning=["year", "month"],
           filesystem=fs,
       )
       print("Rows:", ds.scanner(use_async=True, use_threads=True).count_rows())
       times.append(time.monotonic() - start)
   
   print("No filter")
   print("Min:", min(times), "s")
   print("Median:", statistics.median(times), "s")
   print("Mean:", statistics.mean(times), "s")
   print("Max:", max(times), "s")
   
   print("With filter")
   times = []
   expr = pyarrow.dataset.field("passenger_count") > 1
   for _ in range(10):
       start = time.monotonic()
       ds = pyarrow.dataset.dataset([
           "ursa-labs-taxi-data/2009/01/data.parquet",
           "ursa-labs-taxi-data/2009/02/data.parquet",
           "ursa-labs-taxi-data/2009/03/data.parquet",
           "ursa-labs-taxi-data/2009/04/data.parquet",
           ],
           format="parquet",
           partitioning=["year", "month"],
           filesystem=fs,
       )
       print("Rows:", ds.scanner(use_async=True, use_threads=True, filter=expr).count_rows())
       times.append(time.monotonic() - start)
   print("Min:", min(times), "s")
   print("Median:", statistics.median(times), "s")
   print("Mean:", statistics.mean(times), "s")
   print("Max:", max(times), "s")
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874969456


   Benchmark runs are scheduled for baseline = 780e95c512d63bbea1e040af0eb44a0bf63c4d72 and contender = dfb10c0bff4c5f29734abd5a029d0788900d3cd1. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/399dd7ae47d345969dfba70ba5d60fae...54f7624e8abb450e9d9562f9f390b162/)
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/eef93c4bfb1d4a8b92351f41883dfd55...7b8d37b925574f93b386a04c6753f0c1/)
   [Skipped :warning: Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/e5af519d38bd4cc483cd45f86750d807...604a6d14b8754e4e98372819b77a4c66/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874969456


   Benchmark runs are scheduled for baseline = 780e95c512d63bbea1e040af0eb44a0bf63c4d72 and contender = dfb10c0bff4c5f29734abd5a029d0788900d3cd1. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Scheduled] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/399dd7ae47d345969dfba70ba5d60fae...54f7624e8abb450e9d9562f9f390b162/)
   [Scheduled] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/eef93c4bfb1d4a8b92351f41883dfd55...7b8d37b925574f93b386a04c6753f0c1/)
   [Skipped :warning: Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/e5af519d38bd4cc483cd45f86750d807...604a6d14b8754e4e98372819b77a4c66/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r665600038



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -506,73 +444,116 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
         return EnumeratedRecordBatch{record_batch, fragment};
       };
 
-  auto combined_gen = MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
-
-  if (filter_and_project) {
-    return FilterAndProjectRecordBatchAsync(options, std::move(combined_gen));
-  }
-  return combined_gen;
+  return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
 }
 
 Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
-    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options,
-    bool filter_and_project = true) {
+    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
   auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
   return MakeMappedGenerator(std::move(enumerated_fragment_gen),
                              [=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
-                               return FragmentToBatches(fragment, options,
-                                                        filter_and_project);
+                               return FragmentToBatches(fragment, options);
                              });
 }
 
-Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>> FragmentsToRowCount(
-    FragmentGenerator fragment_gen,
-    std::shared_ptr<ScanOptions> options_with_projection) {
-  // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
-  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
 
-  // Drop projection since we only need to count rows
-  auto options = std::make_shared<ScanOptions>(*options_with_projection);
-  RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));

Review comment:
       @bkietz the reason why CountRows is slower is because this is missing from the new implementation :joy: I'm testing using the NYC Taxi dataset, hence, this is the difference between loading one integer column and loading several gigabytes of data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666334162



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       Replacing with a stop token sounds useful but I'd prefer to handle that in a follow up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666202502



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{
+      nullptr, [plan, exec_context](...) {
+        bool not_finished_yet = plan->finished().TryAddCallback([&] {

Review comment:
       StopProducing is idempotent, and the goal here is only to save unnecessary calls/keepalives if the plan has already finished




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664793347



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -729,20 +792,72 @@ Future<std::shared_ptr<Table>> AsyncScanner::ToTableAsync(
   });
 }
 
+namespace {
+Result<int64_t> GetSelectionSize(const Datum& selection, int64_t length) {
+  if (length == 0) return 0;
+
+  if (selection.is_scalar()) {
+    if (!selection.scalar()->is_valid) return 0;
+    if (!selection.scalar_as<BooleanScalar>().value) return 0;
+    return length;
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto count, compute::Sum(selection));
+  return static_cast<int64_t>(count.scalar_as<UInt64Scalar>().value);
+}
+}  // namespace
+
 Result<int64_t> AsyncScanner::CountRows() {
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  ARROW_ASSIGN_OR_RAISE(auto count_gen_gen,
-                        FragmentsToRowCount(std::move(fragment_gen), scan_options_));
-  auto count_gen = MakeConcatenatedGenerator(std::move(count_gen_gen));
-  int64_t total = 0;
-  auto sum_fn = [&total](util::optional<int64_t> count) -> Status {
-    if (count.has_value()) total += *count;
-    return Status::OK();
-  };
-  RETURN_NOT_OK(VisitAsyncGenerator<util::optional<int64_t>>(std::move(count_gen),
-                                                             std::move(sum_fn))
-                    .status());
-  return total;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+
+  std::atomic<int64_t> total{0};
+
+  fragment_gen = MakeMappedGenerator(
+      std::move(fragment_gen), [&](const std::shared_ptr<Fragment>& fragment) {
+        return fragment->CountRows(scan_options_->filter, scan_options_)
+            .Then([&, fragment](util::optional<int64_t> fast_count) mutable
+                  -> std::shared_ptr<Fragment> {
+              if (fast_count) {
+                // fast path: got row count directly; skip scanning this fragment
+                total += *fast_count;
+                return std::make_shared<OneShotFragment>(
+                    scan_options_->dataset_schema,
+                    MakeEmptyIterator<std::shared_ptr<RecordBatch>>());
+              }
+
+              // slow path: actually filter this fragment's batches
+              return std::move(fragment);
+            });
+      });
+
+  ARROW_ASSIGN_OR_RAISE(auto scan,
+                        MakeScanNode(plan.get(), std::move(fragment_gen), scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto get_selection,
+      compute::MakeProjectNode(scan, "get_selection", {scan_options_->filter}));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(get_selection, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  RETURN_NOT_OK(
+      VisitAsyncGenerator(std::move(sink_gen),
+                          [&](const util::optional<compute::ExecBatch>& batch) {
+                            // TODO replace with scalar aggregation node
+                            ARROW_ASSIGN_OR_RAISE(
+                                int64_t slow_count,
+                                GetSelectionSize(batch->values[0], batch->length));
+                            total += slow_count;
+                            return Status::OK();
+                          })
+          .status());
+
+  plan->finished().Wait();

Review comment:
       Encountered where? ExecPlan::finished() is never marked with an error status. I suppose it'd be fine to check the status anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664745185



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1254,8 +1261,32 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) {
               ResultWith(UnorderedElementsAreArray(expected)));
 }
 
-TEST(ScanNode, ProjectionPushdown) {
-  // ensure non-projected columns are dropped
+TEST(ScanNode, DISABLED_ProjectionPushdown) {

Review comment:
       Correct, I'll add the JIRA number here once I file the appropriate follow up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666331058



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,6 +272,8 @@ class ConcreteFutureImpl : public FutureImpl {
         return true;
       case ShouldSchedule::IfUnfinished:
         return !in_add_callback;
+      case ShouldSchedule::IfDifferentExecutor:
+        return !callback_record.options.executor->OwnsThisThread();

Review comment:
       I use it in SourceNode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664794191



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -729,20 +792,72 @@ Future<std::shared_ptr<Table>> AsyncScanner::ToTableAsync(
   });
 }
 
+namespace {
+Result<int64_t> GetSelectionSize(const Datum& selection, int64_t length) {
+  if (length == 0) return 0;
+
+  if (selection.is_scalar()) {
+    if (!selection.scalar()->is_valid) return 0;
+    if (!selection.scalar_as<BooleanScalar>().value) return 0;
+    return length;
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto count, compute::Sum(selection));
+  return static_cast<int64_t>(count.scalar_as<UInt64Scalar>().value);
+}
+}  // namespace
+
 Result<int64_t> AsyncScanner::CountRows() {
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  ARROW_ASSIGN_OR_RAISE(auto count_gen_gen,
-                        FragmentsToRowCount(std::move(fragment_gen), scan_options_));
-  auto count_gen = MakeConcatenatedGenerator(std::move(count_gen_gen));
-  int64_t total = 0;
-  auto sum_fn = [&total](util::optional<int64_t> count) -> Status {
-    if (count.has_value()) total += *count;
-    return Status::OK();
-  };
-  RETURN_NOT_OK(VisitAsyncGenerator<util::optional<int64_t>>(std::move(count_gen),
-                                                             std::move(sum_fn))
-                    .status());
-  return total;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+
+  std::atomic<int64_t> total{0};
+
+  fragment_gen = MakeMappedGenerator(
+      std::move(fragment_gen), [&](const std::shared_ptr<Fragment>& fragment) {
+        return fragment->CountRows(scan_options_->filter, scan_options_)
+            .Then([&, fragment](util::optional<int64_t> fast_count) mutable
+                  -> std::shared_ptr<Fragment> {
+              if (fast_count) {
+                // fast path: got row count directly; skip scanning this fragment
+                total += *fast_count;
+                return std::make_shared<OneShotFragment>(
+                    scan_options_->dataset_schema,
+                    MakeEmptyIterator<std::shared_ptr<RecordBatch>>());
+              }
+
+              // slow path: actually filter this fragment's batches
+              return std::move(fragment);
+            });
+      });
+
+  ARROW_ASSIGN_OR_RAISE(auto scan,
+                        MakeScanNode(plan.get(), std::move(fragment_gen), scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto get_selection,
+      compute::MakeProjectNode(scan, "get_selection", {scan_options_->filter}));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(get_selection, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  RETURN_NOT_OK(
+      VisitAsyncGenerator(std::move(sink_gen),
+                          [&](const util::optional<compute::ExecBatch>& batch) {
+                            // TODO replace with scalar aggregation node
+                            ARROW_ASSIGN_OR_RAISE(
+                                int64_t slow_count,
+                                GetSelectionSize(batch->values[0], batch->length));
+                            total += slow_count;
+                            return Status::OK();
+                          })
+          .status());
+
+  plan->finished().Wait();

Review comment:
       Ah sorry, I didn't realize this can't fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664679484



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       In other places we wrap the generator with a generator that holds a reference to whatever resources are necessary - why doesn't that work here?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship

Review comment:
       Is this worth filing a follow-up for? I'd guess a lot of scanner usage is already single-use.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship
+  std::unordered_map<std::shared_ptr<compute::ExecPlan>,
+                     std::shared_ptr<compute::ExecContext>>
+      plans_;

Review comment:
       This is a little questionable in that I don't think the lifetime of these generators was tied to the lifetime of the scanner before. Maybe we should also keep a reference to the scanner from the generators?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, /*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
+
+  auto merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may be
+        // unnecessarily materialized columns in batch. We can drop them now instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches than this,
+        // for example parquet's row group stats. Failing to do this leaves perf on the
+        // table because row group stats could be used to skip kernel execs in FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       It is a little unfortunate that we're making several heap allocations for every batch to carry effectively three integers (though I guess the overhead is probably not noticeable in the grand scheme of things and batches should be relatively large).

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       I guess - we might drop the outermost generator but not necessarily the whole graph of generators?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1254,8 +1261,32 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) {
               ResultWith(UnorderedElementsAreArray(expected)));
 }
 
-TEST(ScanNode, ProjectionPushdown) {
-  // ensure non-projected columns are dropped
+TEST(ScanNode, DISABLED_ProjectionPushdown) {

Review comment:
       Disabled because of that TODO above?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, /*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
+
+  auto merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may be
+        // unnecessarily materialized columns in batch. We can drop them now instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches than this,
+        // for example parquet's row group stats. Failing to do this leaves perf on the
+        // table because row group stats could be used to skip kernel execs in FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       I guess if it is ever noticeable overhead, we could make the booleans singletons and pool the integer allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875082987


   Ok, actually, I'm not sure if it was stuck or just me being inpatient…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875823292


   If we correct the CountRows issue above, CountRows indeed gets much faster!
   
   ```
   Before:
   
   Min: 24.285114823000185 s
   Median: 25.309564675000274 s
   Mean: 25.45627587750014 s
   Max: 26.564001424999333 s
   
   After:
   
   Min: 6.617772240000249 s
   Median: 6.906719556000098 s
   Mean: 6.926710154500052 s
   Max: 7.311853507000251 s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r665622977



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -506,73 +444,116 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
         return EnumeratedRecordBatch{record_batch, fragment};
       };
 
-  auto combined_gen = MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
-
-  if (filter_and_project) {
-    return FilterAndProjectRecordBatchAsync(options, std::move(combined_gen));
-  }
-  return combined_gen;
+  return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
 }
 
 Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
-    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options,
-    bool filter_and_project = true) {
+    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
   auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
   return MakeMappedGenerator(std::move(enumerated_fragment_gen),
                              [=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
-                               return FragmentToBatches(fragment, options,
-                                                        filter_and_project);
+                               return FragmentToBatches(fragment, options);
                              });
 }
 
-Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>> FragmentsToRowCount(
-    FragmentGenerator fragment_gen,
-    std::shared_ptr<ScanOptions> options_with_projection) {
-  // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
-  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
 
-  // Drop projection since we only need to count rows
-  auto options = std::make_shared<ScanOptions>(*options_with_projection);
-  RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));

Review comment:
       @lidavidm thanks for the fix!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666199751



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer if not.
-                // This can happen for in-memory scans where batches didn't require
-                // any CPU work to decode. Otherwise, parsing etc should have already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {
+                  std::unique_lock<std::mutex> lock(mutex_);
+                  int seq = batch_count_++;
+                  if (stop_requested_) {
+                    return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+                  }
                   lock.unlock();
-                  // unless we were already finished, push the error to our output
-                  // XXX is this correct? Is it reasonable for a consumer to
-                  // ignore errors from a finished producer?
-                  outputs_[0]->ErrorReceived(this, error);
-                }
-                return Break(seq);
-              });
-        }).Then([&](int seq) {
-          /// XXX this is probably redundant: do we always call InputFinished after
-          /// ErrorReceived or will ErrorRecieved be sufficient?
-          outputs_[0]->InputFinished(this, seq);
-        });
+
+                  return generator_().Then(
+                      [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+                        std::unique_lock<std::mutex> lock(mutex_);
+                        if (!batch || stop_requested_) {

Review comment:
       will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874920743


   (Looks like we need to rebase before the Conbench benchmarks can run.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664744956



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship

Review comment:
       I'll write a follow up JIRA. Many of the unit tests assume a scanner can be reused, and it'll be a change in behavior in the Python API too since we expose Scanner there




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875007564


   From Conbench seems like performance isn't really affected. I'll manually run those benchmarks with more iterations as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875067173


   This got stuck while I was testing it (I'll try to get a backtrace shortly). But performance indeed seems to be on par with before.
   
   <details>
   
   4.0.1:
   ```
           "iterations": 10,
           "max": "10.501172",
           "mean": "9.783475",
           "median": "9.783779",
           "min": "9.203580",
           "q1": "9.451842",
           "q3": "10.074056",
   ```
   
   5.0.0 master
   ```
           "iterations": 10,
           "max": "12.266335",
           "mean": "10.373638",
           "median": "10.223064",
           "min": "9.785621",
           "q1": "10.084575",
           "q3": "10.322508",
   ```
   Count rows, no filter, 10 iterations:
   ```
   Min: 0.39461345200015785 s
   Median: 0.420534689500073 s
   Mean: 0.4404416943999422 s
   Max: 0.5733599779996439 s
   ```
   
   This PR
   ```
           "iterations": 10,
           "max": "11.053488",
           "mean": "10.406635",
           "median": "10.273359",
           "min": "9.980684",
           "q1": "10.145428",
           "q3": "10.701346",
   ```
   Count rows, no filter, 10 iterations:
   ```
   Min: 0.3555871419994219 s
   Median: 0.38891541300017707 s
   Mean: 0.4031996126002014 s
   Max: 0.5227700120003647 s
   ```
   (It got stuck when a filter was applied)
   
   <details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664746139



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, /*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
+
+  auto merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may be
+        // unnecessarily materialized columns in batch. We can drop them now instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches than this,
+        // for example parquet's row group stats. Failing to do this leaves perf on the
+        // table because row group stats could be used to skip kernel execs in FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       See also https://issues.apache.org/jira/browse/ARROW-8928




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666505229



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{

Review comment:
       I don't know that it's worthy of a JIRA on its own, it was more just an observation.  I'm fine with this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666200686



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;

Review comment:
       It would work but in the case where a batch is already on the desired executor (because a Fragment has already seeded parallelism) there's no need and scheduling would potentially lose thread locality




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664743485



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship
+  std::unordered_map<std::shared_ptr<compute::ExecPlan>,
+                     std::shared_ptr<compute::ExecContext>>
+      plans_;

Review comment:
       Well I originally did this because the destruction of ExecPlan was waiting for the plan to finish, blocking the only thread which could finish the plan. However removing it doesn't seem to reproduce the deadlock so I guess `stop_producing` is a sufficient keepalive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664763702



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship

Review comment:
       https://issues.apache.org/jira/browse/ARROW-13264




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666203110



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -65,16 +61,24 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
 
   Status Validate();
 
-  /// Start producing on all nodes
+  /// \brief Start producing on all nodes
   ///
   /// Nodes are started in reverse topological order, such that any node
   /// is started before all of its inputs.
   Status StartProducing();
 
+  /// \brief Stop producing on all nodes
+  ///
+  /// Nodes are stopped topological order, such that any node

Review comment:
       thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874887015


   I'll test this on EC2 when I get a chance and compare to 4.0/current master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-874888474


   Benchmark runs are scheduled for baseline = 780e95c512d63bbea1e040af0eb44a0bf63c4d72 and contender = c1808f9156bd4df03cf8338217ffd93f519abb96. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Failed] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/399dd7ae47d345969dfba70ba5d60fae...153e1fbed2d04daebb6acdf1c2389903/)
   [Failed] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/eef93c4bfb1d4a8b92351f41883dfd55...6f6c2fcd5d0046bd8e70799566145aaa/)
   [Skipped :warning: Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/e5af519d38bd4cc483cd45f86750d807...12bcd567e1074dcbb6aa56f22219c4de/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666332778



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{

Review comment:
       We can add that in follow up if necessary, otherwise I'd prefer to avoid adding complexity to `async_generator.h` for exclusive use here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz closed pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz closed pull request #10664:
URL: https://github.com/apache/arrow/pull/10664


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666505934



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer if not.
-                // This can happen for in-memory scans where batches didn't require
-                // any CPU work to decode. Otherwise, parsing etc should have already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {

Review comment:
       No reason other than readability.  The visitor would just be using Loop under the hood.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#issuecomment-875698291


   Well that's disappointing. I was actually expecting it to get faster


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666329817



##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       I've updated the tests to reuse the default wait

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();
+  ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", "process2",

Review comment:
       I'll duplicate the comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666506838



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       A follow up sounds fine to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org