You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 16:02:11 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #10664: ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans

lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664679484



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       In other places we wrap the generator with a generator that holds a reference to whatever resources are necessary - why doesn't that work here?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship

Review comment:
       Is this worth filing a follow-up for? I'd guess a lot of scanner usage is already single-use.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship
+  std::unordered_map<std::shared_ptr<compute::ExecPlan>,
+                     std::shared_ptr<compute::ExecContext>>
+      plans_;

Review comment:
       This is a little questionable in that I don't think the lifetime of these generators was tied to the lifetime of the scanner before. Maybe we should also keep a reference to the scanner from the generators?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, /*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
+
+  auto merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may be
+        // unnecessarily materialized columns in batch. We can drop them now instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches than this,
+        // for example parquet's row group stats. Failing to do this leaves perf on the
+        // table because row group stats could be used to skip kernel execs in FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       It is a little unfortunate that we're making several heap allocations for every batch to carry effectively three integers (though I guess the overhead is probably not noticeable in the grand scheme of things and batches should be relatively large).

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       I guess - we might drop the outermost generator but not necessarily the whole graph of generators?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1254,8 +1261,32 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) {
               ResultWith(UnorderedElementsAreArray(expected)));
 }
 
-TEST(ScanNode, ProjectionPushdown) {
-  // ensure non-projected columns are dropped
+TEST(ScanNode, DISABLED_ProjectionPushdown) {

Review comment:
       Disabled because of that TODO above?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, /*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
+
+  auto merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may be
+        // unnecessarily materialized columns in batch. We can drop them now instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches than this,
+        // for example parquet's row group stats. Failing to do this leaves perf on the
+        // table because row group stats could be used to skip kernel execs in FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       I guess if it is ever noticeable overhead, we could make the booleans singletons and pool the integer allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org