You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/17 02:49:05 UTC

[GitHub] [arrow] westonpace opened a new pull request #10076: [C++] Support file parallelism in AsyncScanner

westonpace opened a new pull request #10076:
URL: https://github.com/apache/arrow/pull/10076


   This PR adds file parallelism (with appropriate file readahead limits) into the async scanner.
   
   **DRAFT**: This PR depends on ARROW-12289 and so will remain in the draft state until the parent is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r621809458



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+            }
+          }
+        }
+      }
+    }
+    return collected;
+  }
+
+  struct FragmentStats {
+    int last_index;
+    bool seen;
+  };
+
+  std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+    auto last_indices = GetLastIndices(order);
+    std::vector<FragmentStats> fragment_stats;
+    for (std::size_t i = 0; i < last_indices.size(); i++) {
+      fragment_stats.push_back({last_indices[i], false});
+    }
+    return fragment_stats;
+  }
+
+  /// When data arrives out of order then we first have to buffer up 1 item in order to
+  /// know when the last item has arrived (so we can mark it as the last).  This means
+  /// sometimes we deliver an item and don't get one (first in a fragment) and sometimes
+  /// we deliver an item and we end up getting two (last in a fragment)
+  std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+      std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+    std::vector<EnumeratedRecordBatch> collected;
+    auto fragment_stats = GetFragmentStats(order);
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+        dataset_->FinishFragment(fragment_index);
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+      if (!fragment_stats[fragment_index].seen) {
+        fragment_stats[fragment_index].seen = true;
+      } else {
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+    }
+    return collected;
+  }
+
+  std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+    ScannerBuilder builder(dataset_);
+    // Reordering tests only make sense for async
+    builder.UseAsync(true);
+    if (fragment_readahead != 0) {
+      builder.FragmentReadahead(fragment_readahead);
+    }
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    return scanner;
+  }
+
+  void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+                            std::vector<int> expected_order) {
+    ASSERT_EQ(expected_order.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+    }
+  }
+
+  void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+                            std::vector<int> expected_batch_indices,
+                            std::vector<int> expected_row_sizes) {
+    ASSERT_EQ(expected_batch_indices.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_row_sizes[i], batches[i].record_batch.value->num_rows());
+      ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+    }
+  }
+
+  std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+  explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+      : generator(generator), next() {}
+
+  void AssertCanConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK(next);
+    next = Future<EnumeratedRecordBatch>();
+  }
+
+  void AssertCannotConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    SleepABit();
+    ASSERT_FALSE(next.is_finished());
+  }
+
+  void AssertFinished() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+    ASSERT_TRUE(IsIterationEnd(last));
+  }
+
+  EnumeratedRecordBatchGenerator generator;
+  Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+  auto scanner = MakeScanner(1);
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  BatchConsumer consumer(std::move(batch_gen));
+  dataset_->DeliverBatch(0, 0);
+  dataset_->DeliverBatch(0, 1);
+  consumer.AssertCanConsume();
+  consumer.AssertCannotConsume();
+  dataset_->DeliverBatch(1, 0);
+  consumer.AssertCannotConsume();
+  dataset_->FinishFragment(1);

Review comment:
       I added a bit more description.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619480153



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+            }
+          }
+        }
+      }
+    }
+    return collected;
+  }
+
+  struct FragmentStats {
+    int last_index;
+    bool seen;
+  };
+
+  std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+    auto last_indices = GetLastIndices(order);
+    std::vector<FragmentStats> fragment_stats;
+    for (std::size_t i = 0; i < last_indices.size(); i++) {
+      fragment_stats.push_back({last_indices[i], false});
+    }
+    return fragment_stats;
+  }
+
+  /// When data arrives out of order then we first have to buffer up 1 item in order to
+  /// know when the last item has arrived (so we can mark it as the last).  This means
+  /// sometimes we deliver an item and don't get one (first in a fragment) and sometimes
+  /// we deliver an item and we end up getting two (last in a fragment)
+  std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+      std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+    std::vector<EnumeratedRecordBatch> collected;
+    auto fragment_stats = GetFragmentStats(order);
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+        dataset_->FinishFragment(fragment_index);
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+      if (!fragment_stats[fragment_index].seen) {
+        fragment_stats[fragment_index].seen = true;
+      } else {
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+    }
+    return collected;
+  }
+
+  std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+    ScannerBuilder builder(dataset_);
+    // Reordering tests only make sense for async
+    builder.UseAsync(true);
+    if (fragment_readahead != 0) {
+      builder.FragmentReadahead(fragment_readahead);
+    }
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    return scanner;
+  }
+
+  void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+                            std::vector<int> expected_order) {
+    ASSERT_EQ(expected_order.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+    }
+  }
+
+  void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+                            std::vector<int> expected_batch_indices,
+                            std::vector<int> expected_row_sizes) {
+    ASSERT_EQ(expected_batch_indices.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_row_sizes[i], batches[i].record_batch.value->num_rows());
+      ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+    }
+  }
+
+  std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+  explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+      : generator(generator), next() {}
+
+  void AssertCanConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK(next);
+    next = Future<EnumeratedRecordBatch>();
+  }
+
+  void AssertCannotConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    SleepABit();
+    ASSERT_FALSE(next.is_finished());
+  }
+
+  void AssertFinished() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+    ASSERT_TRUE(IsIterationEnd(last));
+  }
+
+  EnumeratedRecordBatchGenerator generator;
+  Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+  auto scanner = MakeScanner(1);
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  BatchConsumer consumer(std::move(batch_gen));
+  dataset_->DeliverBatch(0, 0);
+  dataset_->DeliverBatch(0, 1);
+  consumer.AssertCanConsume();
+  consumer.AssertCannotConsume();
+  dataset_->DeliverBatch(1, 0);
+  consumer.AssertCannotConsume();
+  dataset_->FinishFragment(1);

Review comment:
       The purpose of this test is to make sure we are enforcing `fragment_readahead`.  Since `fragment_readahead` is set to 1 we will only read one fragment at a time.  So fragment 0 is the only "active" fragment and fragment 1 is still stuck in `batch_gen_gen`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619478106



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {

Review comment:
       Your interpretation is correct.  Later one, when we've delivered the last batch for the fragment, we also want to close the fragment (so it delivers an end-token on the iterator).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r620225452



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+            }
+          }
+        }
+      }
+    }
+    return collected;
+  }
+
+  struct FragmentStats {
+    int last_index;
+    bool seen;
+  };
+
+  std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+    auto last_indices = GetLastIndices(order);
+    std::vector<FragmentStats> fragment_stats;
+    for (std::size_t i = 0; i < last_indices.size(); i++) {
+      fragment_stats.push_back({last_indices[i], false});
+    }
+    return fragment_stats;
+  }
+
+  /// When data arrives out of order then we first have to buffer up 1 item in order to
+  /// know when the last item has arrived (so we can mark it as the last).  This means
+  /// sometimes we deliver an item and don't get one (first in a fragment) and sometimes
+  /// we deliver an item and we end up getting two (last in a fragment)
+  std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+      std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+    std::vector<EnumeratedRecordBatch> collected;
+    auto fragment_stats = GetFragmentStats(order);
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+        dataset_->FinishFragment(fragment_index);
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+      if (!fragment_stats[fragment_index].seen) {
+        fragment_stats[fragment_index].seen = true;
+      } else {
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+    }
+    return collected;
+  }
+
+  std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+    ScannerBuilder builder(dataset_);
+    // Reordering tests only make sense for async
+    builder.UseAsync(true);
+    if (fragment_readahead != 0) {
+      builder.FragmentReadahead(fragment_readahead);
+    }
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    return scanner;
+  }
+
+  void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+                            std::vector<int> expected_order) {
+    ASSERT_EQ(expected_order.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+    }
+  }
+
+  void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+                            std::vector<int> expected_batch_indices,
+                            std::vector<int> expected_row_sizes) {
+    ASSERT_EQ(expected_batch_indices.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_row_sizes[i], batches[i].record_batch.value->num_rows());
+      ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+    }
+  }
+
+  std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+  explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+      : generator(generator), next() {}
+
+  void AssertCanConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK(next);
+    next = Future<EnumeratedRecordBatch>();
+  }
+
+  void AssertCannotConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    SleepABit();
+    ASSERT_FALSE(next.is_finished());
+  }
+
+  void AssertFinished() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+    ASSERT_TRUE(IsIterationEnd(last));
+  }
+
+  EnumeratedRecordBatchGenerator generator;
+  Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+  auto scanner = MakeScanner(1);
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  BatchConsumer consumer(std::move(batch_gen));
+  dataset_->DeliverBatch(0, 0);
+  dataset_->DeliverBatch(0, 1);
+  consumer.AssertCanConsume();
+  consumer.AssertCannotConsume();
+  dataset_->DeliverBatch(1, 0);
+  consumer.AssertCannotConsume();
+  dataset_->FinishFragment(1);

Review comment:
       It might be nice to explain inline why we expect to get/not get a batch, or at least put something like `MakeScanner(/*fragment_readahead=*/1)` to call out the readahead level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619485334



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;

Review comment:
       Yes, that is correct.  And yes, these tests are rather tricky.  This test attempts to programmatically figure out what the output order should be based on the input order.  Another approach I could take is the approach in `TEST_F(TestReordering, FileReadahead)`.  In that test we explicitly specify the output order, leaving it up to the test author to figure out what it should be.
   
   While the test is tricky I felt it served as a sort of "self-documentation" of the foibles that need to be considered when looking at expected batch ordering.
   
   On the other hand, we could just get rid of the buffering.  I've created ARROW-12523 to consider that possibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619477344



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -115,7 +116,7 @@ class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {
 
     AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
   }
-};
+};  // namespace dataset

Review comment:
       I am using whatever version of clang-format ships with VSCode and it has format-on-save enabled.  This happens when I am in the middle of modifying some method and cutting/pasting code and end up (one way or another) with an extra closing brace.  E.g. I save with something like...
   
   ```
   int foo() {
     auto x = [] {}};
   }
   ```
   
   Then it seems that `clang-format` decides that final brace must be the closing brace for the namespace and so it adds the comment.  Later, when I fix the error and resave, it does not remove the namespace comment.
   
   I will investigate if a newer version of clang-format addresses this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619486400



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;

Review comment:
       I think it's overall OK, just had to read it carefully and get all the context paged back in. (GitHub reviews aren't the best for seeing all the other stuff going on.) Though if that could be removed, that would also be great. Obviously it doesn't have to hold up things here though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#issuecomment-822640874


   https://issues.apache.org/jira/browse/ARROW-12386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619478321



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -115,7 +116,7 @@ class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {
 
     AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
   }
-};
+};  // namespace dataset

Review comment:
       Interesting, FWIW I've only ever used the `format` target from our build scripts which doesn't have this issue.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
   return table_fut.result();
 }
 
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
+  return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
   auto self = shared_from_this();
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
   ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
                         FragmentsToBatches(self, std::move(fragment_gen)));
-  return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+  auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+      std::move(batch_gen_gen), scan_options_->fragment_readahead);
+  return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                             scan_options_->fragment_readahead);
+}

Review comment:
       Ah ok, thanks for the explanation. I think I got mixed up because I thought MakeMergedGenerator could also itself queue up to one item per source.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619576323



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -115,7 +116,7 @@ class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {
 
     AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
   }
-};
+};  // namespace dataset

Review comment:
       I removed it manually for now.  I tried `ninja format` which runs `cd /home/pace/dev/arrow/cpp/conbench-build && /home/pace/anaconda3/envs/conbench2/bin/python3.7 /h.../dev/arrow/cpp/build-support/lint_exclusions.txt --source_dir /home/pace/dev/arrow/cpp/src --fix --quiet` and it had the same issue.  Although presumably, if you are doing format manually, you wouldn't run format until after everything is compiling correctly so this wouldn't be an issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #10076:
URL: https://github.com/apache/arrow/pull/10076


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619189262



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
   return table_fut.result();
 }
 
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
+  return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
   auto self = shared_from_this();
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
   ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
                         FragmentsToBatches(self, std::move(fragment_gen)));
-  return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+  auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+      std::move(batch_gen_gen), scan_options_->fragment_readahead);
+  return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                             scan_options_->fragment_readahead);
+}

Review comment:
       So here we're teeing up at most `fragment_readahead` active fragments, of which we can queue up to `fragment_readahead` batches? And the net effect is that we read from each fragment in parallel.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -115,7 +116,7 @@ class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {
 
     AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
   }
-};
+};  // namespace dataset

Review comment:
       I'm not sure what happened here - clang-format added this to the class declaration?
   
   (N.B. How are you using clang-format? I don't think it does this to me.)

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+            }
+          }
+        }
+      }
+    }
+    return collected;
+  }
+
+  struct FragmentStats {
+    int last_index;
+    bool seen;
+  };
+
+  std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+    auto last_indices = GetLastIndices(order);
+    std::vector<FragmentStats> fragment_stats;
+    for (std::size_t i = 0; i < last_indices.size(); i++) {
+      fragment_stats.push_back({last_indices[i], false});
+    }
+    return fragment_stats;
+  }
+
+  /// When data arrives out of order then we first have to buffer up 1 item in order to
+  /// know when the last item has arrived (so we can mark it as the last).  This means
+  /// sometimes we deliver an item and don't get one (first in a fragment) and sometimes
+  /// we deliver an item and we end up getting two (last in a fragment)
+  std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+      std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+    std::vector<EnumeratedRecordBatch> collected;
+    auto fragment_stats = GetFragmentStats(order);
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+        dataset_->FinishFragment(fragment_index);
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+      if (!fragment_stats[fragment_index].seen) {
+        fragment_stats[fragment_index].seen = true;
+      } else {
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+    }
+    return collected;
+  }
+
+  std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+    ScannerBuilder builder(dataset_);
+    // Reordering tests only make sense for async
+    builder.UseAsync(true);
+    if (fragment_readahead != 0) {
+      builder.FragmentReadahead(fragment_readahead);
+    }
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    return scanner;
+  }
+
+  void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+                            std::vector<int> expected_order) {
+    ASSERT_EQ(expected_order.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+    }
+  }
+
+  void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+                            std::vector<int> expected_batch_indices,
+                            std::vector<int> expected_row_sizes) {
+    ASSERT_EQ(expected_batch_indices.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_row_sizes[i], batches[i].record_batch.value->num_rows());
+      ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+    }
+  }
+
+  std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+  explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+      : generator(generator), next() {}
+
+  void AssertCanConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK(next);
+    next = Future<EnumeratedRecordBatch>();
+  }
+
+  void AssertCannotConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    SleepABit();
+    ASSERT_FALSE(next.is_finished());
+  }
+
+  void AssertFinished() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+    ASSERT_TRUE(IsIterationEnd(last));
+  }
+
+  EnumeratedRecordBatchGenerator generator;
+  Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+  auto scanner = MakeScanner(1);
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  BatchConsumer consumer(std::move(batch_gen));
+  dataset_->DeliverBatch(0, 0);
+  dataset_->DeliverBatch(0, 1);
+  consumer.AssertCanConsume();
+  consumer.AssertCannotConsume();
+  dataset_->DeliverBatch(1, 0);
+  consumer.AssertCannotConsume();
+  dataset_->FinishFragment(1);

Review comment:
       Since this is an unordered scan, why can't we deliver the last batch for fragment 1 at this point?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;

Review comment:
       (This is all to check my understanding, this is somewhat tricky…)

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment

Review comment:
       nit: bonus record batch?
   also nit: as explained below, this is because of the buffering?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be avoided if
+  /// delivering in order but easier to have a single code path).  We also can't deliver
+  /// items that don't come next.  These two facts make for some pretty complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;

Review comment:
       So `seen_fragment` is basically, 'have we delivered a batch to this fragment before' (since we can only expect to pull a batch on the 2nd delivery)?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {

Review comment:
       Ah though the usage below makes it clearer.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which things are
+/// delivered so that we can test out of order resequencing.  The dataset allows
+/// batches to be delivered on any fragment.  When delivering batches a num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return "scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {

Review comment:
       I had to read this a few times to figure out what was going on. Given a vector which maps the Nth batch delivered to the Kth fragment, we get a vector mapping each fragment to the index of the last batch it delivered?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10076: ARROW-12386: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619474881



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
   return table_fut.result();
 }
 
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
+  return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
   auto self = shared_from_this();
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
   ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
                         FragmentsToBatches(self, std::move(fragment_gen)));
-  return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+  auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+      std::move(batch_gen_gen), scan_options_->fragment_readahead);
+  return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                             scan_options_->fragment_readahead);
+}

Review comment:
       > So here we're teeing up at most fragment_readahead active fragments, of which we can queue up to fragment_readahead batches?
   I don't know that I would phrase it that way.  The first part `we're teeing up at most fragment_readahead active fragments` is correct.  Each active fragment will queue 1 batch.  So across all active fragments there will be `fragment_readahead` queued batches.  However, there is not `fragment_readahead` batches per fragment.  I'm not sure which you were describing.
   
   The last sentence is correct assuming that both `batch_gen_gen` and the generated emitted by `batch_gen_gen` are truly asynchronous (e.g. they are calling `Executor::Submit` at some point).  The naive `FileFormat::ScanBatches` blocks and isn't truly asynchronous and so `MakeMergedGenerator` won't add parallelism.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
   return table_fut.result();
 }
 
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
+  return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
   auto self = shared_from_this();
   ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
   ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
                         FragmentsToBatches(self, std::move(fragment_gen)));
-  return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+  auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+      std::move(batch_gen_gen), scan_options_->fragment_readahead);
+  return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                             scan_options_->fragment_readahead);
+}

Review comment:
       > So here we're teeing up at most fragment_readahead active fragments, of which we can queue up to fragment_readahead batches?
   
   I don't know that I would phrase it that way.  The first part `we're teeing up at most fragment_readahead active fragments` is correct.  Each active fragment will queue 1 batch.  So across all active fragments there will be `fragment_readahead` queued batches.  However, there is not `fragment_readahead` batches per fragment.  I'm not sure which you were describing.
   
   The last sentence is correct assuming that both `batch_gen_gen` and the generated emitted by `batch_gen_gen` are truly asynchronous (e.g. they are calling `Executor::Submit` at some point).  The naive `FileFormat::ScanBatches` blocks and isn't truly asynchronous and so `MakeMergedGenerator` won't add parallelism.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10076: [C++] Support file parallelism in AsyncScanner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#issuecomment-821754336


   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/master/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW
   
   Opening JIRAs ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org