You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/30 18:34:22 UTC

[GitHub] [arrow] fsaintjacques opened a new pull request #7075: ARROW-8447: [C++] Ensure row ordering in Scanner::ToTable

fsaintjacques opened a new pull request #7075:
URL: https://github.com/apache/arrow/pull/7075


   * This fixes the issue where ScanTask would race to push to the accumulating RecordBatchVector. The new version assign an ordered index to each ScanTask preserving the order in which they were generated by Scanner::Scan.
   * Deprecate MakeRecordBatchReader into RecordBatchReader::Make
   * Add Table::FromRecordBatchReader


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7075: ARROW-8447: [C++] Ensure row deterministic ordering in Scanner::ToTable

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7075:
URL: https://github.com/apache/arrow/pull/7075#issuecomment-622034299


   https://issues.apache.org/jira/browse/ARROW-8447


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7075: ARROW-8447: [C++] Ensure row deterministic ordering in Scanner::ToTable

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7075:
URL: https://github.com/apache/arrow/pull/7075#discussion_r418216405



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -165,23 +165,47 @@ std::shared_ptr<TaskGroup> ScanContext::TaskGroup() const {
   return TaskGroup::MakeSerial();
 }
 
+static inline RecordBatchVector FlattenRecordBatchVector(
+    std::vector<RecordBatchVector> nested_batches) {
+  RecordBatchVector flattened;
+
+  for (auto& task_batches : nested_batches) {
+    for (auto& batch : task_batches) {
+      flattened.emplace_back(std::move(batch));
+    }
+  }
+
+  return flattened;
+}
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
-  std::mutex mutex;
-  RecordBatchVector batches;
-
   auto task_group = scan_context_->TaskGroup();
 
+  // Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::vector<RecordBatchVector> batches;
+  size_t scan_task_id = 0;
   for (auto maybe_scan_task : scan_task_it) {
     ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
 
-    task_group->Append([&batches, &mutex, scan_task] {
+    auto id = scan_task_id++;
+    task_group->Append([&batches, &mutex, id, scan_task] {
       ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
 
+      RecordBatchVector local;
       for (auto maybe_batch : batch_it) {
         ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
+        local.emplace_back(std::move(batch));
+      }

Review comment:
       ```suggestion
         ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
   ```

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -191,7 +215,8 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
   // Wait for all tasks to complete, or the first error.
   RETURN_NOT_OK(task_group->Finish());
 
-  return Table::FromRecordBatches(scan_options_->schema(), std::move(batches));
+  return Table::FromRecordBatches(scan_options_->schema(),
+                                  FlattenRecordBatchVector(batches));

Review comment:
       ```suggestion
                                     FlattenRecordBatchVector(std::move(batches)));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org