You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/07 21:58:36 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #9589: ARROW-11797: [C++][Dataset] Provide batch stream Scanner methods

lidavidm commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r609096011



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -165,6 +166,9 @@ class ARROW_DS_EXPORT Scanner {
   /// Scan result in memory before creating the Table.
   Result<std::shared_ptr<Table>> ToTable();
 
+  /// \brief ToBatches returns an iterator over all Batches yielded by this scan.
+  Result<RecordBatchIterator> ToBatches();

Review comment:
       You have a point. I didn't want to take ScanBatches since your method has a different signature. But maybe that could be `ScanIndexedBatches` or something.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate
+      return true;
+    });
+
+    if (pop_cursor == task_batches.size()) {
+      // Don't report an error until we yield up everything we can first
+      RETURN_NOT_OK(iteration_error);
+      return nullptr;
+    }
+
+    auto batch = std::move(task_batches[pop_cursor].front());
+    task_batches[pop_cursor].pop_front();
+    return batch;
+  }
+};
+
+constexpr int kToBatchesReadaheadLevel = 2;

Review comment:
       Right, so this does assume a few files (if not all files) fit into memory. It would be hard to add backpressure on the batch level since when we're using a serial task group, we materialize _all_ record batches for a file before handing control back to the top level. (That's also not ideal for latency.) But I figure this is a stop-gap before your much better implementation.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate
+      return true;
+    });
+
+    if (pop_cursor == task_batches.size()) {
+      // Don't report an error until we yield up everything we can first

Review comment:
       Right. I chose this since when we implement Head(), I figure it doesn't matter to the user if a task fails that they don't actually want data from. Maybe consistency is better, though.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate

Review comment:
       Eventually we'll run out of scan tasks/batches, since an error in getting the next scan task won't stop the current scan tasks from eventually completing. So really it's "all scan tasks drained (and maybe we didn't start all of them due to a failure)"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org