You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/13 03:41:13 UTC

[GitHub] [arrow] westonpace opened a new pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

westonpace opened a new pull request #10008:
URL: https://github.com/apache/arrow/pull/10008


   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#issuecomment-818601902


   https://issues.apache.org/jira/browse/ARROW-12289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615039360



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -318,6 +337,28 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner {
   std::shared_ptr<Fragment> fragment_;
 };
 
+class ARROW_DS_EXPORT AsyncScanner : public Scanner,

Review comment:
       Ah, that's a good idea.  I can move both the scanners to `.cc`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615108169



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -166,13 +169,29 @@ class ARROW_DS_EXPORT ScanTask {
   std::shared_ptr<Fragment> fragment_;
 };
 
-template <typename T>
-struct Enumerated {
-  T value;
-  int index;
-  bool last;
+/// \brief A trivial ScanTask that yields the RecordBatch of an array.

Review comment:
       Well, for clarity's sake.  I just moved them back to where you had them.  I"ll leave internal headers for a future PR to keep this one at a reasonable size.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615107914



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -244,6 +244,8 @@ Status ScannerBuilder::UseThreads(bool use_threads) {
   return Status::OK();
 }
 
+void ScannerBuilder::UseAsync(bool use_async) { scan_options_->use_async = use_async; }

Review comment:
       Fair enough.  I changed it to return a Status.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613553173



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Well, technically speaking the # of batches per fragment depends both on this and max batch size.  So I suppose we could have gotten sufficient testing by setting the max batch size small enough.  At the moment these tests help exercise the SyncScanner if nothing else.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r612491223



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -36,8 +36,20 @@ constexpr int64_t kNumberChildDatasets = 2;
 constexpr int64_t kNumberBatches = 16;
 constexpr int64_t kBatchSize = 1024;
 
-class TestScanner : public DatasetFixtureMixin {
+struct PrintIsAsyncParam {
+  std::string operator()(::testing::TestParamInfo<bool> info) {
+    if (info.param) {
+      return "async";
+    } else {
+      return "sync";
+    }
+  }
+};
+
+class TestScanner : public DatasetFixtureMixinWithParam<bool> {

Review comment:
       Ah, yes.  I was planning on adding whether to scan with `Scan` (to ensure we still test the legacy), `ScanBatches`, or `ScanBatchesUnordered` as a parameter as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615108344



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -318,6 +337,28 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner {
   std::shared_ptr<Fragment> fragment_;
 };
 
+class ARROW_DS_EXPORT AsyncScanner : public Scanner,

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#issuecomment-821556377


   I rebased @lidavidm 's latest changes.  At this point I think I don't think there are anymore outstanding dataset PRs to rebase so I think this one is probably ready to merge if it passes review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615106192



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -36,8 +36,20 @@ constexpr int64_t kNumberChildDatasets = 2;
 constexpr int64_t kNumberBatches = 16;
 constexpr int64_t kBatchSize = 1024;
 
-class TestScanner : public DatasetFixtureMixin {
+struct PrintIsAsyncParam {
+  std::string operator()(::testing::TestParamInfo<bool> info) {
+    if (info.param) {
+      return "async";
+    } else {
+      return "sync";
+    }
+  }
+};
+
+class TestScanner : public DatasetFixtureMixinWithParam<bool> {

Review comment:
       I've absorbed `UseThreads` into the matrix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615107986



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -166,13 +169,29 @@ class ARROW_DS_EXPORT ScanTask {
   std::shared_ptr<Fragment> fragment_;
 };
 
-template <typename T>
-struct Enumerated {
-  T value;
-  int index;
-  bool last;
+/// \brief A trivial ScanTask that yields the RecordBatch of an array.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613435402



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Oh.  I think you are right.  I should probably add a scanner unit test that generates more than one scan task.  I'll work on that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#issuecomment-822735167


   Integration build: usual issues
   All MacOS builds: ARROW-12467 (need to account for LLVM 12)
   Travis: I think this is a flake
   AppVeyor: this is a flake, albeit a common one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613553323



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Ah good point. It is mostly just a nit as it's really a testing parameter that's unfortunately getting exposed in the public API.
   
   This isn't a very strong precedent, but TableBatchReader handles batch_size by letting you set it after construction and that feels like an analogue of this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613321431



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +365,228 @@ Future<std::shared_ptr<Table>> SyncScanner::ToTableInternal(
           });
 }
 
+namespace {
+
+inline Result<EnumeratedRecordBatch> DoFilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, const EnumeratedRecordBatch& in) {
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
+                        SimplifyWithGuarantee(scanner->options()->filter,
+                                              in.fragment.value->partition_expression()));
+
+  compute::ExecContext exec_context{scanner->options()->pool};
+  ARROW_ASSIGN_OR_RAISE(
+      Datum mask, ExecuteScalarExpression(simplified_filter, Datum(in.record_batch.value),
+                                          &exec_context));
+
+  Datum filtered;
+  if (mask.is_scalar()) {
+    const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+    if (mask_scalar.is_valid && mask_scalar.value) {
+      // filter matches entire table
+      filtered = in.record_batch.value;
+    } else {
+      // Filter matches nothing
+      filtered = in.record_batch.value->Slice(0, 0);
+    }
+  } else {
+    ARROW_ASSIGN_OR_RAISE(
+        filtered, compute::Filter(in.record_batch.value, mask,
+                                  compute::FilterOptions::Defaults(), &exec_context));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
+                        SimplifyWithGuarantee(scanner->options()->projection,
+                                              in.fragment.value->partition_expression()));
+  ARROW_ASSIGN_OR_RAISE(
+      Datum projected,
+      ExecuteScalarExpression(simplified_projection, filtered, &exec_context));
+
+  DCHECK_EQ(projected.type()->id(), Type::STRUCT);
+  if (projected.shape() == ValueDescr::SCALAR) {
+    // Only virtual columns are projected. Broadcast to an array
+    ARROW_ASSIGN_OR_RAISE(
+        projected,
+        MakeArrayFromScalar(*projected.scalar(), filtered.record_batch()->num_rows(),
+                            scanner->options()->pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto out,
+                        RecordBatch::FromStructArray(projected.array_as<StructArray>()));
+  auto projected_batch =
+      out->ReplaceSchemaMetadata(in.record_batch.value->schema()->metadata());
+
+  return EnumeratedRecordBatch{
+      {std::move(projected_batch), in.record_batch.index, in.record_batch.last},
+      in.fragment};
+}
+
+inline EnumeratedRecordBatchGenerator FilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, EnumeratedRecordBatchGenerator rbs) {
+  auto mapper = [scanner](const EnumeratedRecordBatch& in) {
+    return DoFilterAndProjectRecordBatchAsync(scanner, in);
+  };
+  return MakeMappedGenerator<EnumeratedRecordBatch>(std::move(rbs), mapper);
+}
+
+Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
+    std::shared_ptr<AsyncScanner> scanner,
+    const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen,
+                        fragment.value->ScanBatchesAsync(*scanner->options()));
+  auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen));
+
+  auto combine_fn =
+      [fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) {
+        return EnumeratedRecordBatch{record_batch, fragment};
+      };
+
+  auto combined_gen = MakeMappedGenerator<EnumeratedRecordBatch>(enumerated_batch_gen,
+                                                                 std::move(combine_fn));
+
+  return FilterAndProjectRecordBatchAsync(scanner, std::move(combined_gen));
+}
+
+Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
+    std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+  return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
+      std::move(enumerated_fragment_gen),
+      [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+        return FragmentToBatches(scanner, fragment);
+      });
+}
+
+}  // namespace
+
+Result<FragmentGenerator> AsyncScanner::GetFragments() const {
+  // TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment> here.
+  // Current iterator based versions are all fast & sync so we will just ToVector it
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector());
+  return MakeVectorGenerator(std::move(fragments_vec));
+}
+
+Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen, ScanBatchesAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen,
+                        ScanBatchesUnorderedAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
+  auto table_fut = ToTableAsync(scan_options_->cpu_executor);
+  return table_fut.result();
+}
+
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
+    internal::Executor* cpu_executor) {
+  auto self = shared_from_this();
+  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(self, std::move(fragment_gen)));
+  return MakeConcatenatedGenerator(std::move(batch_gen_gen));

Review comment:
       Could this be MakeMergedGenerator?

##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -95,6 +95,55 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt
   return MakeMapIterator(fn, std::move(batches_it));
 }
 
+Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync(
+    const ScanOptions& options) {
+  struct State {
+    State(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size)
+        : fragment(std::move(fragment)),
+          batch_index(0),
+          offset(0),
+          batch_size(batch_size) {}
+
+    std::shared_ptr<RecordBatch> Next() {
+      const auto& next_parent = fragment->record_batches_[batch_index];
+      if (offset < next_parent->num_rows()) {
+        auto next = next_parent->Slice(offset, batch_size);
+        offset += batch_size;
+        return next;
+      }
+      batch_index++;
+      offset = 0;
+      return nullptr;
+    }
+
+    bool Finished() { return batch_index >= fragment->record_batches_.size(); }
+
+    std::shared_ptr<InMemoryFragment> fragment;
+    std::size_t batch_index;
+    int64_t offset;
+    int64_t batch_size;
+  };
+
+  struct Generator {
+    Generator(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size)
+        : state(std::make_shared<State>(std::move(fragment), batch_size)) {}
+
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      while (!state->Finished()) {
+        auto next = state->Next();
+        if (next) {
+          return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));
+        }
+      }
+      return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+    }
+
+    std::shared_ptr<State> state;
+  };
+  return Generator(std::dynamic_pointer_cast<InMemoryFragment>(shared_from_this()),

Review comment:
       nit: maybe `internal::checked_pointer_cast`? (though admittedly it's not used super consistently throughout the codebase)

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Don't we need to check again if NextSync/NextAsync return the end marker? Otherwise, operator() will return a Future that resolves to the end marker and the consumer will stop early.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615107215



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       I capitulated and removed the argument.  Your comment about it just being a testing parameter is accurate.  I created a test in `arrow-dataset-file-test` that does not rely on `InMemoryDataset` to test this logic here.  I might in the future add some tests to `ScannerTest` that set a limit on scan options batch size to get coverage of the multiple batches per fragment case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613414017



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       There is a silent precondition here that every fragment scan should return scan tasks that return at least 1 record batch (unless the entire fragment is empty in which case either 0 scan tasks or 1 scan task with 0 batches should both be ok).
   
   I'm know this precondition holds for IPC and CSV (by virtue of there being only one scan task) but wasn't sure about parquet (i.e. can a push down filter cause a batch-less scan task to be emitted in the middle of a set of scan tasks?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613554161



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Once SyncScanner goes away we could probably change `InMemoryFragment::record_batches_` to `InMemoryFragment::record_batch_`.  This reflects the spirit of "getting rid of scan tasks" better anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r614827209



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -318,6 +337,28 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner {
   std::shared_ptr<Fragment> fragment_;
 };
 
+class ARROW_DS_EXPORT AsyncScanner : public Scanner,

Review comment:
       Is this worth declaring in the header? We could keep it in the .cc file so that people don't instantiate it directly.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -244,6 +244,8 @@ Status ScannerBuilder::UseThreads(bool use_threads) {
   return Status::OK();
 }
 
+void ScannerBuilder::UseAsync(bool use_async) { scan_options_->use_async = use_async; }

Review comment:
       nit: all the other methods return Status (even if it's a bit pointless)

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -166,13 +169,29 @@ class ARROW_DS_EXPORT ScanTask {
   std::shared_ptr<Fragment> fragment_;
 };
 
-template <typename T>
-struct Enumerated {
-  T value;
-  int index;
-  bool last;
+/// \brief A trivial ScanTask that yields the RecordBatch of an array.

Review comment:
       FWIW I had moved these helpers to the bottom of the file (outside the `@}`) so that they don't get picked up in the API docs as I'd basically consider them implementation details. Though at that point we may actually want them to be in one of the `_internal` headers.

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,58 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);

Review comment:
       nit: if we're going to heap-allocate them anyways, and the sync version takes a shared_ptr, maybe we can just take a shared_ptr here in the first place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613415855



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +365,228 @@ Future<std::shared_ptr<Table>> SyncScanner::ToTableInternal(
           });
 }
 
+namespace {
+
+inline Result<EnumeratedRecordBatch> DoFilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, const EnumeratedRecordBatch& in) {
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
+                        SimplifyWithGuarantee(scanner->options()->filter,
+                                              in.fragment.value->partition_expression()));
+
+  compute::ExecContext exec_context{scanner->options()->pool};
+  ARROW_ASSIGN_OR_RAISE(
+      Datum mask, ExecuteScalarExpression(simplified_filter, Datum(in.record_batch.value),
+                                          &exec_context));
+
+  Datum filtered;
+  if (mask.is_scalar()) {
+    const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+    if (mask_scalar.is_valid && mask_scalar.value) {
+      // filter matches entire table
+      filtered = in.record_batch.value;
+    } else {
+      // Filter matches nothing
+      filtered = in.record_batch.value->Slice(0, 0);
+    }
+  } else {
+    ARROW_ASSIGN_OR_RAISE(
+        filtered, compute::Filter(in.record_batch.value, mask,
+                                  compute::FilterOptions::Defaults(), &exec_context));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
+                        SimplifyWithGuarantee(scanner->options()->projection,
+                                              in.fragment.value->partition_expression()));
+  ARROW_ASSIGN_OR_RAISE(
+      Datum projected,
+      ExecuteScalarExpression(simplified_projection, filtered, &exec_context));
+
+  DCHECK_EQ(projected.type()->id(), Type::STRUCT);
+  if (projected.shape() == ValueDescr::SCALAR) {
+    // Only virtual columns are projected. Broadcast to an array
+    ARROW_ASSIGN_OR_RAISE(
+        projected,
+        MakeArrayFromScalar(*projected.scalar(), filtered.record_batch()->num_rows(),
+                            scanner->options()->pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto out,
+                        RecordBatch::FromStructArray(projected.array_as<StructArray>()));
+  auto projected_batch =
+      out->ReplaceSchemaMetadata(in.record_batch.value->schema()->metadata());
+
+  return EnumeratedRecordBatch{
+      {std::move(projected_batch), in.record_batch.index, in.record_batch.last},
+      in.fragment};
+}
+
+inline EnumeratedRecordBatchGenerator FilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, EnumeratedRecordBatchGenerator rbs) {
+  auto mapper = [scanner](const EnumeratedRecordBatch& in) {
+    return DoFilterAndProjectRecordBatchAsync(scanner, in);
+  };
+  return MakeMappedGenerator<EnumeratedRecordBatch>(std::move(rbs), mapper);
+}
+
+Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
+    std::shared_ptr<AsyncScanner> scanner,
+    const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen,
+                        fragment.value->ScanBatchesAsync(*scanner->options()));
+  auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen));
+
+  auto combine_fn =
+      [fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) {
+        return EnumeratedRecordBatch{record_batch, fragment};
+      };
+
+  auto combined_gen = MakeMappedGenerator<EnumeratedRecordBatch>(enumerated_batch_gen,
+                                                                 std::move(combine_fn));
+
+  return FilterAndProjectRecordBatchAsync(scanner, std::move(combined_gen));
+}
+
+Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
+    std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+  return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
+      std::move(enumerated_fragment_gen),
+      [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+        return FragmentToBatches(scanner, fragment);
+      });
+}
+
+}  // namespace
+
+Result<FragmentGenerator> AsyncScanner::GetFragments() const {
+  // TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment> here.
+  // Current iterator based versions are all fast & sync so we will just ToVector it
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector());
+  return MakeVectorGenerator(std::move(fragments_vec));
+}
+
+Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen, ScanBatchesAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen,
+                        ScanBatchesUnorderedAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
+  auto table_fut = ToTableAsync(scan_options_->cpu_executor);
+  return table_fut.result();
+}
+
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
+    internal::Executor* cpu_executor) {
+  auto self = shared_from_this();
+  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(self, std::move(fragment_gen)));
+  return MakeConcatenatedGenerator(std::move(batch_gen_gen));

Review comment:
       It will need to be.  The problem is that `MakeMergedGenerator` is immediately consuming `EnumeratingGenerator` which is not async-reentrant.  `MakeMergedGenerator` (erroneously) pulls from the outer (the gen_gen) generator in an async-reentrant fashion.  I'll make a follow-up JIRA just to keep this one simple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613417793



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +365,228 @@ Future<std::shared_ptr<Table>> SyncScanner::ToTableInternal(
           });
 }
 
+namespace {
+
+inline Result<EnumeratedRecordBatch> DoFilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, const EnumeratedRecordBatch& in) {
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
+                        SimplifyWithGuarantee(scanner->options()->filter,
+                                              in.fragment.value->partition_expression()));
+
+  compute::ExecContext exec_context{scanner->options()->pool};
+  ARROW_ASSIGN_OR_RAISE(
+      Datum mask, ExecuteScalarExpression(simplified_filter, Datum(in.record_batch.value),
+                                          &exec_context));
+
+  Datum filtered;
+  if (mask.is_scalar()) {
+    const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+    if (mask_scalar.is_valid && mask_scalar.value) {
+      // filter matches entire table
+      filtered = in.record_batch.value;
+    } else {
+      // Filter matches nothing
+      filtered = in.record_batch.value->Slice(0, 0);
+    }
+  } else {
+    ARROW_ASSIGN_OR_RAISE(
+        filtered, compute::Filter(in.record_batch.value, mask,
+                                  compute::FilterOptions::Defaults(), &exec_context));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
+                        SimplifyWithGuarantee(scanner->options()->projection,
+                                              in.fragment.value->partition_expression()));
+  ARROW_ASSIGN_OR_RAISE(
+      Datum projected,
+      ExecuteScalarExpression(simplified_projection, filtered, &exec_context));
+
+  DCHECK_EQ(projected.type()->id(), Type::STRUCT);
+  if (projected.shape() == ValueDescr::SCALAR) {
+    // Only virtual columns are projected. Broadcast to an array
+    ARROW_ASSIGN_OR_RAISE(
+        projected,
+        MakeArrayFromScalar(*projected.scalar(), filtered.record_batch()->num_rows(),
+                            scanner->options()->pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto out,
+                        RecordBatch::FromStructArray(projected.array_as<StructArray>()));
+  auto projected_batch =
+      out->ReplaceSchemaMetadata(in.record_batch.value->schema()->metadata());
+
+  return EnumeratedRecordBatch{
+      {std::move(projected_batch), in.record_batch.index, in.record_batch.last},
+      in.fragment};
+}
+
+inline EnumeratedRecordBatchGenerator FilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, EnumeratedRecordBatchGenerator rbs) {
+  auto mapper = [scanner](const EnumeratedRecordBatch& in) {
+    return DoFilterAndProjectRecordBatchAsync(scanner, in);
+  };
+  return MakeMappedGenerator<EnumeratedRecordBatch>(std::move(rbs), mapper);
+}
+
+Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
+    std::shared_ptr<AsyncScanner> scanner,
+    const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen,
+                        fragment.value->ScanBatchesAsync(*scanner->options()));
+  auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen));
+
+  auto combine_fn =
+      [fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) {
+        return EnumeratedRecordBatch{record_batch, fragment};
+      };
+
+  auto combined_gen = MakeMappedGenerator<EnumeratedRecordBatch>(enumerated_batch_gen,
+                                                                 std::move(combine_fn));
+
+  return FilterAndProjectRecordBatchAsync(scanner, std::move(combined_gen));
+}
+
+Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
+    std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+  auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
+  return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
+      std::move(enumerated_fragment_gen),
+      [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+        return FragmentToBatches(scanner, fragment);
+      });
+}
+
+}  // namespace
+
+Result<FragmentGenerator> AsyncScanner::GetFragments() const {
+  // TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment> here.
+  // Current iterator based versions are all fast & sync so we will just ToVector it
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector());
+  return MakeVectorGenerator(std::move(fragments_vec));
+}
+
+Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen, ScanBatchesAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen,
+                        ScanBatchesUnorderedAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
+  auto table_fut = ToTableAsync(scan_options_->cpu_executor);
+  return table_fut.result();
+}
+
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
+    internal::Executor* cpu_executor) {
+  auto self = shared_from_this();
+  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(self, std::move(fragment_gen)));
+  return MakeConcatenatedGenerator(std::move(batch_gen_gen));

Review comment:
       ARROW-12386




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#issuecomment-821169242


   After this lands I can rebase and implement ScanBatchesAsync for IPC/Parquet and give that another test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#issuecomment-822619114


   I'll merge on green unless somebody else wants to take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r616012753



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -51,6 +52,8 @@ constexpr int64_t kDefaultBatchSize = 1 << 20;
 constexpr int32_t kDefaultBatchReadahead = 32;
 constexpr int32_t kDefaultFragmentReadahead = 8;
 
+using FragmentGenerator = std::function<Future<std::shared_ptr<Fragment>>()>;

Review comment:
       I was able to move FragmentGenerator but `async_generator.h` was still needed for `Enumerated` which is a pity since `Enumerated` is small and self-contained.  Should I place it in its own file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613433698



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       I may be reading this wrong, but when we finish one scan task and move on to the next, since we just fall through here, we'll return a completed Future which contains a nullptr, which gets returned as the result of `operator()`. So the generator's consumer will think that the generator has ended, even though we still have more scan tasks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613549219



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       I see that there's now a parameter to generate multiple scan tasks per fragment in InMemoryDataset - however, is that necessary? For one, it doesn't affect this code path, since this only affects file fragments. For another, it doesn't affect the scanner, which doesn't use scan tasks (directly); it'll use ScanBatchesAsync on the Fragment, which flattens all the scan tasks itself anyways.
   
   So I think the issue pointed out here doesn't show up in test purely because only Parquet fragments expose multiple scan tasks per fragment right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615114195



##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -145,6 +145,23 @@ void AssertIteratorNext(T expected, Iterator<T>& it) {
   ASSERT_EQ(expected, actual);
 }
 
+template <typename T>
+Iterator<T> FailsAt(Iterator<T> source, int failure_index, Status failure) {
+  struct Iter {
+    Result<T> Next() {
+      if (index++ == failure_index) {
+        return failure;
+      }
+      return source.Next();
+    }
+    Iterator<T> source;
+    int failure_index;
+    Status failure;
+    int index;
+  };
+  return Iterator<T>(Iter{std::move(source), failure_index, std::move(failure), 0});
+}
+

Review comment:
       If I'm not mistaken, FailsAt and AssertBufferIteratorMatch are now entirely unused?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -51,6 +52,8 @@ constexpr int64_t kDefaultBatchSize = 1 << 20;
 constexpr int32_t kDefaultBatchReadahead = 32;
 constexpr int32_t kDefaultFragmentReadahead = 8;
 
+using FragmentGenerator = std::function<Future<std::shared_ptr<Fragment>>()>;

Review comment:
       I think moving the subclass definitions means you can also move this alias and get rid of the async_generator.h include.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r616012277



##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -145,6 +145,23 @@ void AssertIteratorNext(T expected, Iterator<T>& it) {
   ASSERT_EQ(expected, actual);
 }
 
+template <typename T>
+Iterator<T> FailsAt(Iterator<T> source, int failure_index, Status failure) {
+  struct Iter {
+    Result<T> Next() {
+      if (index++ == failure_index) {
+        return failure;
+      }
+      return source.Next();
+    }
+    Iterator<T> source;
+    int failure_index;
+    Status failure;
+    int index;
+  };
+  return Iterator<T>(Iter{std::move(source), failure_index, std::move(failure), 0});
+}
+

Review comment:
       Correct.  I've removed them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r612500764



##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -95,6 +95,33 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt
   return MakeMapIterator(fn, std::move(batches_it));
 }
 
+Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync(
+    const ScanOptions& options) {
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (batch_index >= self->record_batches_.size()) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      const auto& next_parent = self->record_batches_[batch_index];
+      if (offset + batch_size < next_parent->num_rows()) {
+        offset += batch_size;
+        auto next = next_parent->Slice(offset, batch_size);
+        return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));
+      }
+      batch_index++;
+      auto next = next_parent->Slice(offset, batch_size);
+      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));

Review comment:
       Yep.  This logic was all backwards.  I've since changed it to a while loop.  Just pushed the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615039709



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -166,13 +169,29 @@ class ARROW_DS_EXPORT ScanTask {
   std::shared_ptr<Fragment> fragment_;
 };
 
-template <typename T>
-struct Enumerated {
-  T value;
-  int index;
-  bool last;
+/// \brief A trivial ScanTask that yields the RecordBatch of an array.

Review comment:
       Oh shoot.  After the rebase I had two instances of the exact same method and this explains it.  I'll fix that up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r612469861



##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -95,6 +95,33 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt
   return MakeMapIterator(fn, std::move(batches_it));
 }
 
+Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync(
+    const ScanOptions& options) {
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (batch_index >= self->record_batches_.size()) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      const auto& next_parent = self->record_batches_[batch_index];
+      if (offset + batch_size < next_parent->num_rows()) {
+        offset += batch_size;
+        auto next = next_parent->Slice(offset, batch_size);
+        return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));
+      }
+      batch_index++;
+      auto next = next_parent->Slice(offset, batch_size);
+      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));

Review comment:
       A few things here:
   - Shouldn't `offset` be reset when we advance to the next batch?
   - The check for whether we've consumed the current batch should just be `offset < num_rows()` I think.
   - `next_parent->Slice` should come before we update the offset.
   - It might be easier to just recurse after advancing to the next batch, if we care about avoiding empty batches. Else, we should update `offset` after the second `Slice` call too.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -36,8 +36,20 @@ constexpr int64_t kNumberChildDatasets = 2;
 constexpr int64_t kNumberBatches = 16;
 constexpr int64_t kBatchSize = 1024;
 
-class TestScanner : public DatasetFixtureMixin {
+struct PrintIsAsyncParam {
+  std::string operator()(::testing::TestParamInfo<bool> info) {
+    if (info.param) {
+      return "async";
+    } else {
+      return "sync";
+    }
+  }
+};
+
+class TestScanner : public DatasetFixtureMixinWithParam<bool> {

Review comment:
       ARROW-11797 uses the param to toggle UseThreads, so this will have to become a `std::pair<bool, bool>` (or really, just a custom struct) in the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613551889



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       Fair point.  I think it's still necessary but could be renamed as it is a bit vague.  In the async case it is "batches per fragment" and in the sync case it is "scan tasks per fragment".  It was enough to break the async scanner (it currently fails these tests).  I also agree it doesn't expose this issue so I'll some more tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r612491538



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -36,8 +36,20 @@ constexpr int64_t kNumberChildDatasets = 2;
 constexpr int64_t kNumberBatches = 16;
 constexpr int64_t kBatchSize = 1024;
 
-class TestScanner : public DatasetFixtureMixin {
+struct PrintIsAsyncParam {
+  std::string operator()(::testing::TestParamInfo<bool> info) {
+    if (info.param) {
+      return "async";
+    } else {
+      return "sync";
+    }
+  }
+};
+
+class TestScanner : public DatasetFixtureMixinWithParam<bool> {

Review comment:
       I'll tackle that when I rebase ARROW-11797




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r615107817



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,58 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
                        std::move(partition_expression), std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r616014031



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -51,6 +52,8 @@ constexpr int64_t kDefaultBatchSize = 1 << 20;
 constexpr int32_t kDefaultBatchReadahead = 32;
 constexpr int32_t kDefaultFragmentReadahead = 8;
 
+using FragmentGenerator = std::function<Future<std::shared_ptr<Fragment>>()>;

Review comment:
       Ah, that's alright then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #10008: ARROW-12289: [C++] Create basic AsyncScanner implementation

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #10008:
URL: https://github.com/apache/arrow/pull/10008


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org