You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 19:28:13 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664822735



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -506,73 +444,116 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
         return EnumeratedRecordBatch{record_batch, fragment};
       };
 
-  auto combined_gen = MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
-
-  if (filter_and_project) {
-    return FilterAndProjectRecordBatchAsync(options, std::move(combined_gen));
-  }
-  return combined_gen;
+  return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
 }
 
 Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
-    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options,
-    bool filter_and_project = true) {
+    FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
   auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
   return MakeMappedGenerator(std::move(enumerated_fragment_gen),
                              [=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
-                               return FragmentToBatches(fragment, options,
-                                                        filter_and_project);
+                               return FragmentToBatches(fragment, options);
                              });
 }
 
-Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>> FragmentsToRowCount(
-    FragmentGenerator fragment_gen,
-    std::shared_ptr<ScanOptions> options_with_projection) {
-  // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
-  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
 
-  // Drop projection since we only need to count rows
-  auto options = std::make_shared<ScanOptions>(*options_with_projection);
-  RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));
-
-  auto count_fragment_fn =
-      [options](const Enumerated<std::shared_ptr<Fragment>>& fragment)
-      -> Result<AsyncGenerator<util::optional<int64_t>>> {
-    auto count_fut = fragment.value->CountRows(options->filter, options);
-    return MakeFromFuture(
-        count_fut.Then([=](util::optional<int64_t> val)
-                           -> Result<AsyncGenerator<util::optional<int64_t>>> {
-          // Fast path
-          if (val.has_value()) {
-            return MakeSingleFutureGenerator(
-                Future<util::optional<int64_t>>::MakeFinished(val));
-          }
-          // Slow path
-          ARROW_ASSIGN_OR_RAISE(auto batch_gen, FragmentToBatches(fragment, options));
-          auto count_fn =
-              [](const EnumeratedRecordBatch& enumerated) -> util::optional<int64_t> {
-            return enumerated.record_batch.value->num_rows();
-          };
-          return MakeMappedGenerator(batch_gen, std::move(count_fn));
-        }));
-  };
-  return MakeMappedGenerator(std::move(enumerated_fragment_gen),
-                             std::move(count_fragment_fn));
-}
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(std::move(fragment_gen), options));
 
-Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsyncImpl(
-    const std::shared_ptr<ScanOptions>& options, FragmentGenerator fragment_gen,
-    internal::Executor* cpu_executor, bool filter_and_project = true) {
-  ARROW_ASSIGN_OR_RAISE(
-      auto batch_gen_gen,
-      FragmentsToBatches(std::move(fragment_gen), options, filter_and_project));
-  auto batch_gen_gen_readahead =
-      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);

Review comment:
       Ben and I spoke about this.  The serial readahead wasn't actually adding anything (batch_gen_gen is really just listing fragments) and it was invalid because a serial readahead generator is not async reentrant and should not be consumed by MakeMergedGenerator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org