You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/12 17:34:51 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #9947: ARROW-12288: [C++] Create Scanner interface

bkietz commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r611719695



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each fragment has
+/// a potentially unique schema and possibly even format.  It should be possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a task to load
+/// the fragment into memory.  That task may or may not support parallel execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) but should
+/// return record batches as soon as they are ready to scan.  Various readahead

Review comment:
       ```suggestion
   /// The scanner should not buffer the entire dataset in memory (unless asked) instead
   /// yielding record batches as soon as they are ready to scan.  Various readahead
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -185,6 +196,45 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure that record batches found in reader are equals to the
+  /// record batches yielded by a scanner.
+  void AssertScanBatchesEquals(RecordBatchReader* expected, Scanner* scanner,
+                               bool ensure_drained = true) {
+    ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches());
+
+    ARROW_EXPECT_OK(it.Visit([&](TaggedRecordBatch batch) -> Status {
+      AssertBatchEquals(expected, batch.record_batch.get());
+      return Status::OK();
+    }));
+
+    if (ensure_drained) {
+      EnsureRecordBatchReaderDrained(expected);
+    }
+  }
+
+  /// \brief Ensure that record batches found in reader are equals to the
+  /// record batches yielded by a scanner.

Review comment:
       ```suggestion
     /// record batches yielded by a scanner. Each fragment in the scanner is
     /// expected to have a single batch.
   ```

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each fragment has
+/// a potentially unique schema and possibly even format.  It should be possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a task to load
+/// the fragment into memory.  That task may or may not support parallel execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) but should
+/// return record batches as soon as they are ready to scan.  Various readahead
+/// properties control how much data is allowed to be scanned before pausing to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may change in

Review comment:
       ```suggestion
   /// Today the scanner also handles projection & filtering although that may change in
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -136,6 +138,15 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure a record batch yielded by the fragment matches the next batch yielded
+  /// by the reader

Review comment:
       ```suggestion
     /// \brief Assert the value of the next batch yielded by the reader
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -136,6 +138,15 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure a record batch yielded by the fragment matches the next batch yielded
+  /// by the reader
+  void AssertBatchEquals(RecordBatchReader* expected, RecordBatch* batch) {

Review comment:
       Nit:
   ```suggestion
     void AssertBatchEquals(RecordBatchReader* expected, const RecordBatch& batch) {
   ```

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each fragment has
+/// a potentially unique schema and possibly even format.  It should be possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a task to load
+/// the fragment into memory.  That task may or may not support parallel execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) but should
+/// return record batches as soon as they are ready to scan.  Various readahead
+/// properties control how much data is allowed to be scanned before pausing to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may change in
+/// the future.
 class ARROW_DS_EXPORT Scanner {
  public:
-  Scanner(std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> scan_options)
-      : dataset_(std::move(dataset)), scan_options_(std::move(scan_options)) {}
-
-  Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanOptions> scan_options)
-      : fragment_(std::move(fragment)), scan_options_(std::move(scan_options)) {}
+  virtual ~Scanner() = default;
 
   /// \brief The Scan operator returns a stream of ScanTask. The caller is
   /// responsible to dispatch/schedule said tasks. Tasks should be safe to run
   /// in a concurrent fashion and outlive the iterator.
-  Result<ScanTaskIterator> Scan();
-
+  ///
+  /// Note: Not supported by the async scanner
+  /// TODO(ARROW-11797) Deprecate Scan()
+  virtual Result<ScanTaskIterator> Scan();
   /// \brief Convert a Scanner into a Table.
   ///
   /// Use this convenience utility with care. This will serially materialize the
   /// Scan result in memory before creating the Table.
-  Result<std::shared_ptr<Table>> ToTable();
+  virtual Result<std::shared_ptr<Table>> ToTable() = 0;
+  /// \brief Scan the dataset into a stream of record batches.  Each batch is tagged
+  /// with the fragment it originated from.  The batches will arrive in order.  The
+  /// order of fragments is determined by the dataset.
+  ///
+  /// Note: The scanner will perform some readahead but will avoid materializing too
+  /// much in memory (this is goverended by the readahead options and use_threads option).
+  /// If the readahead queue fills up then I/O will pause until the calling thread catches
+  /// up.
+  virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
+  /// \brief Scan the dataset into a stream of record batches.  Unlike ScanBatches this
+  /// method may allow record batches to be returned out of order.  This allows for more
+  /// efficient scanning some fragments may be accessed more quickly than others (e.g. may

Review comment:
       ```suggestion
     /// efficient scanning: some fragments may be accessed more quickly than others (e.g. may
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -213,7 +263,7 @@ class DatasetFixtureMixin : public ::testing::Test {
 
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<ScanOptions> options_;
-};
+};  // namespace dataset

Review comment:
       ```suggestion
   };
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -424,68 +424,21 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
                                 std::shared_ptr<Scanner> scanner) {
   RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));
 
-  auto task_group = scanner->options()->TaskGroup();
-
-  // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
-  //
-  // - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
-  //   any fragments we have before waiting for discovery to complete. This isn't
-  //   currently implemented for FileSystemDataset anyway: ARROW-8613
-  //
-  // - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when
-  //   writing Fragments which produce scan tasks slowly. No Fragments do this.
-  //
-  // NB: neither of these will have any impact whatsoever on the common case of writing
-  //     an in-memory table to disk.
-  ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments());
-  ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector());
-  ScanTaskVector scan_tasks;
-  std::vector<Future<>> scan_futs;
-
-  for (const auto& fragment : fragments) {
-    auto options = std::make_shared<ScanOptions>(*scanner->options());
-    // Avoid contention with multithreaded readers
-    options->use_threads = false;
-    ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
-                          Scanner(fragment, std::move(options)).Scan());
-    for (auto maybe_scan_task : scan_task_it) {
-      ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
-      scan_tasks.push_back(std::move(scan_task));
-    }
-  }
-
-  // Store a mapping from partitions (represened by their formatted partition expressions)
-  // to a WriteQueue which flushes batches into that partition's output file. In principle
-  // any thread could produce a batch for any partition, so each task alternates between
-  // pushing batches and flushing them to disk.
+  // Store a mapping from partitions (represented by their formatted partition
+  // expressions) to a WriteQueue which flushes batches into that partition's output file.
+  // In principle any thread could produce a batch for any partition, so each task
+  // alternates between pushing batches and flushing them to disk.
   WriteState state(write_options);
 
-  for (const auto& scan_task : scan_tasks) {
-    if (scan_task->supports_async()) {
-      ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync());
-      std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
-          [&, scan_task](std::shared_ptr<RecordBatch> batch) {
-            return WriteNextBatch(state, scan_task, std::move(batch));
-          };
-      scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
-    } else {
-      task_group->Append([&, scan_task] {
-        ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
-
-        for (auto maybe_batch : batches) {
-          ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-          RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
-        }
+  ARROW_ASSIGN_OR_RAISE(auto batch_it, scanner->ScanBatches());
 
-        return Status::OK();
-      });
-    }
-  }
-  RETURN_NOT_OK(task_group->Finish());
-  auto scan_futs_all_done = AllComplete(scan_futs);
-  RETURN_NOT_OK(scan_futs_all_done.status());
+  RETURN_NOT_OK(batch_it.Visit([&state](TaggedRecordBatch tagged_batch) {

Review comment:
       This seems to remove parallel execution of batch partitioning. If that's intended then please call it out and add a comment with the follow up JIRA noted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org