You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/04 18:07:29 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #10060: ARROW-9697: [C++][Python][R][Dataset] Add CountRows for Scanner

bkietz commented on a change in pull request #10060:
URL: https://github.com/apache/arrow/pull/10060#discussion_r625986262



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -488,10 +507,37 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
   return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
       std::move(enumerated_fragment_gen),
       [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
-        return FragmentToBatches(scanner, fragment);
+        return FragmentToBatches(scanner, fragment, scanner->options());
       });
 }
 
+Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>> FragmentsToRowCount(
+    std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+  // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
+  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+  auto options = std::make_shared<ScanOptions>(*scanner->options());
+  RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));
+  auto count_fragment_fn =
+      [scanner, options](const Enumerated<std::shared_ptr<Fragment>>& fragment)
+      -> Result<AsyncGenerator<util::optional<int64_t>>> {
+    auto count = fragment.value->CountRows(options->filter, options);
+    // Fast path
+    if (count.has_value()) {
+      return MakeSingleFutureGenerator(count.value().Then(
+          [](int64_t val) { return util::make_optional<int64_t>(val); }));

Review comment:
       ```suggestion
         return MakeSingleFutureGenerator(count->Then(util::make_optional<int64_t>));
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -169,6 +174,15 @@ Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
   return format_->ScanBatchesAsync(options, self);
 }
 
+util::optional<Future<int64_t>> FileFragment::CountRows(
+    Expression predicate, std::shared_ptr<ScanOptions> options) {
+  ARROW_ASSIGN_OR_RAISE(
+      predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
+  if (!predicate.IsSatisfiable()) return Future<int64_t>::MakeFinished(0);
+  auto self = internal::checked_pointer_cast<FileFragment>(shared_from_this());
+  return format()->CountRows(self, predicate, std::move(options));

Review comment:
       ```suggestion
     return format()->CountRows(self, std::move(predicate), std::move(options));
   ```

##########
File path: cpp/src/arrow/dataset/file_ipc.cc
##########
@@ -173,6 +173,25 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
   return IpcScanTaskIterator::Make(options, fragment);
 }
 
+util::optional<Future<int64_t>> IpcFileFormat::CountRows(
+    const std::shared_ptr<FileFragment>& file, Expression predicate,
+    std::shared_ptr<ScanOptions> options) {
+  ARROW_ASSIGN_OR_RAISE(predicate, SimplifyWithGuarantee(std::move(predicate),
+                                                         file->partition_expression()));
+  if (!predicate.IsSatisfiable()) {
+    return Future<int64_t>::MakeFinished(0);
+  } else if (FieldsInExpression(predicate).size() > 0) {

Review comment:
       Nit: redundant `else`
   ```suggestion
     }
     
     if (FieldsInExpression(predicate).size() > 0) {
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -385,6 +385,23 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return MakeVectorIterator(std::move(tasks));
 }
 
+util::optional<Future<int64_t>> ParquetFileFormat::CountRows(
+    const std::shared_ptr<FileFragment>& file, Expression predicate,
+    std::shared_ptr<ScanOptions> options) {
+  auto parquet_file = internal::checked_pointer_cast<ParquetFileFragment>(file);
+  if (FieldsInExpression(predicate).size() > 0) {

Review comment:
       Would it be worthwhile to try pushing the predicate down to row groups here? If every row group happens to pass or fail the filter whole, we could recover the metadata-only fast path (bailing if any row group requires non trivial filtering)

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -169,6 +174,15 @@ Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
   return format_->ScanBatchesAsync(options, self);
 }
 
+util::optional<Future<int64_t>> FileFragment::CountRows(
+    Expression predicate, std::shared_ptr<ScanOptions> options) {
+  ARROW_ASSIGN_OR_RAISE(
+      predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
+  if (!predicate.IsSatisfiable()) return Future<int64_t>::MakeFinished(0);

Review comment:
       Nit: I think this is long enough to warrant braces
   ```suggestion
     if (!predicate.IsSatisfiable()) {
       return Future<int64_t>::MakeFinished(0);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org