You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/22 17:56:27 UTC

[GitHub] [arrow] lidavidm opened a new pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

lidavidm opened a new pull request #10134:
URL: https://github.com/apache/arrow/pull/10134


   This unifies (most of) the tests across Parquet, Feather, and CSV (with carve-outs for particular cases). In particular, this means all formats are now tested in conjunction with async/sync and serial/threaded scanners. Also, a set of common file format tests were refactored out of the individual tests and centralized.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#issuecomment-825160462


   Ah, despite all the diffs, no tests should have been removed - only shuffled around or consolidated when they were the same exact test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#issuecomment-825622315


   Thanks for the quick review. I think everything's been addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618738629



##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {

Review comment:
       I could be easily convinced either way.  I think it's also good to detect potential interaction bugs.  I suppose the scanner could do various things with scan options (like the projection issue you mentioned) that aren't so easily exhaustively tested in formats.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #10134:
URL: https://github.com/apache/arrow/pull/10134


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#issuecomment-825068972


   https://issues.apache.org/jira/browse/ARROW-12500


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618708458



##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {

Review comment:
       Originally I wanted to have coverage of each file type with the actual scanner (right now we test each format in isolation + the scanner in isolation). But that is overkill as you point out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618709167



##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {
+    EXPECT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema());
+    auto dataset = std::make_shared<FragmentDataset>(schema, FragmentVector{fragment});
+    ScannerBuilder builder(dataset, opts_);
+    ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
+    ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
+    return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
+                           std::move(batch_it));
+  }
+
+  // Scan the fragment directly, without using the scanner.
+  RecordBatchIterator PhysicalBatches(std::shared_ptr<Fragment> fragment) {
+    if (GetParam().use_async) {
+      EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
+      EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
+      return batch_it;
+    }
+    EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
+    return MakeFlattenIterator(MakeMaybeMapIterator(
+        [](std::shared_ptr<ScanTask> scan_task) { return scan_task->Execute(); },
+        std::move(scan_task_it)));
+  }
+
+  // Shared test cases
+  void TestScan(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = this->GetFileSource(reader.get());
+
+    this->SetSchema(reader->schema()->fields());
+    ASSERT_OK_AND_ASSIGN(auto fragment, format->MakeFragment(*source));
+
+    int64_t row_count = 0;
+    for (auto maybe_batch : Batches(fragment)) {
+      ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+      row_count += batch->num_rows();
+    }
+    ASSERT_EQ(row_count, GetParam().expected_rows());
+  }
+
+  void TestScanProjected(FileFormat* format) {

Review comment:
       This is testing whether each format will look at the projection in the ScanOptions and only return the necessary columns to fulfill the projection + filter later on. Notably, CSV didn't properly do that before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618744791



##########
File path: cpp/src/arrow/dataset/file_csv_test.cc
##########
@@ -36,7 +37,21 @@
 namespace arrow {
 namespace dataset {
 
-class TestCsvFileFormat : public testing::TestWithParam<Compression::type> {
+class ArrowCsvWriterMixin {

Review comment:
       I originally tried with abstract methods, but then because of the inheritance tree, you'd have to re-implement them in the AbcFileFormatTest and in the AbcFileFormatScanTest fixtures (I had issues when trying to just have the latter inherit the former). Probably there is some combination of superclasses and interfaces that'll make it work but at that point I decided a template was the lesser evil.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618743453



##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {

Review comment:
       Added to ARROW-12311. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618624665



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -90,9 +90,9 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(
       GetFragmentScanOptions<CsvFragmentScanOptions>(
           kCsvTypeName, scan_options.get(), format.default_fragment_scan_options));
   auto convert_options = csv_scan_options->convert_options;
-  for (FieldRef ref : scan_options->MaterializedFields()) {
-    ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema));
-
+  // Properly set conversion types even for non-materialized fields
+  // (since we're reading all of them anyways)
+  for (auto field : scan_options->dataset_schema->fields()) {

Review comment:
       Unifying the tests led me to discover this, which I think inadvertently fixes a bug I ran into in ARROW-9697 that I hadn't figured out before. There, I found that for CSV fragments `to_table(schema=..., columns=[])` would fail when `to_table(schema=...)` did not fail, because in the former case, the CSV reader would complain about an invalid cast, seemingly ignoring the schema set. 
   
   I think this isn't the best solution, however; we should actually be deriving `include_columns`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r619174208



##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {

Review comment:
       On balance I think I'll refactor this since you plan to add fuller integration tests for datasets which is a better spot to catch more involved things. The test cases here I'll keep, but only to directly test Scan/ScanBatches.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618731254



##########
File path: cpp/src/arrow/dataset/file_parquet_test.cc
##########
@@ -56,9 +52,25 @@ using testing::Pointee;
 
 using internal::checked_pointer_cast;
 
-class ArrowParquetWriterMixin : public ::testing::Test {
+class ArrowParquetWriterMixin {
  public:
-  Status WriteRecordBatch(const RecordBatch& batch, parquet::arrow::FileWriter* writer) {
+  static std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
+    auto pool = ::arrow::default_memory_pool();
+
+    std::shared_ptr<Buffer> out;
+
+    auto sink = CreateOutputStream(pool);
+
+    ARROW_EXPECT_OK(WriteRecordBatchReader(reader, pool, sink));
+    // XXX the rest of the test may crash if this fails, since out will be nullptr

Review comment:
       Minor but could the method be changed to return `Result` so this comment can go away?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10134: ARROW-12500: [C++][Datasets] Ensure better test coverage of Dataset file formats

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10134:
URL: https://github.com/apache/arrow/pull/10134#discussion_r618665298



##########
File path: cpp/src/arrow/dataset/file_csv_test.cc
##########
@@ -36,7 +37,21 @@
 namespace arrow {
 namespace dataset {
 
-class TestCsvFileFormat : public testing::TestWithParam<Compression::type> {
+class ArrowCsvWriterMixin {

Review comment:
       Naming nit: Not sure `mixin` is the right term here.  When I think `mixin` I think multiple inheritance.  Maybe `Strategy`?  Also, why "ArrowCsv".  Are there other CSV writers?  Also, maybe there is a way to use composition here?  Have an abstract method in `FileFormatFixtureMixin` that returns a `FormatStrategy` or something?  I know "prefer composition over inheritance" but I don't know if there is a similar "prefer composition over template" guideline :laughing: 

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {

Review comment:
       Rename to `TestInspectFailureWithRelevantError`?  There is no method named `Open` on `FileFormat`.

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();

Review comment:
       Nit: Maybe move this into a constructor to make it easier to modify options in the future?  Or you could change the method name to `Initialize` so it is more clear this must be called first?

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());

Review comment:
       These `ok` checks seem redundant given we are not testing `Result`/`Status` here and you test the `code` on the next line.

##########
File path: cpp/src/arrow/dataset/file_csv_test.cc
##########
@@ -199,18 +210,11 @@ TEST_P(TestCsvFileFormat, OpenFailureWithRelevantError) {
   if (GetCompression() != Compression::type::UNCOMPRESSED) {
     GTEST_SKIP() << "File source name is different with compression";

Review comment:
       Is this really a problem?  Compression is just adding to the filename and we are testing with `HasSubstr`.

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,

Review comment:
       Nit: This is more helper function than test.  Perhaps rename to `WriteToBuffer`?  Also, we could create a `TestWrite` here that does...
   
   ```
     auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
     auto source = GetFileSource(reader.get());
     auto written = WriteToBuffer(format_.get(), reader->schema());
     AssertBufferEqual(*written, *source->buffer());
   ```
   
   ...that should be universally applicable.

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {

Review comment:
       Not for this PR but I feel like there are a number of places where we have to do something like this to maintain implicit invariants.  It might be nice to have a `ScanOptions` cleanup at some point.  Change the name so it isn't `Options`, hide it from the public API, give it more methods (like `SetFilter`) and private state.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -90,11 +90,16 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(
       GetFragmentScanOptions<CsvFragmentScanOptions>(
           kCsvTypeName, scan_options.get(), format.default_fragment_scan_options));
   auto convert_options = csv_scan_options->convert_options;
-  for (FieldRef ref : scan_options->MaterializedFields()) {
-    ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema));
-
+  auto materialized = scan_options->MaterializedFields();
+  std::unordered_set<std::string> materialized_fields(materialized.begin(),
+                                                      materialized.end());
+  for (auto field : scan_options->dataset_schema->fields()) {
+    // Properly set conversion types for all fields
     if (column_names.find(field->name()) == column_names.end()) continue;
     convert_options.column_types[field->name()] = field->type();
+    // Only read the requested columns
+    if (materialized_fields.find(field->name()) == materialized_fields.end()) continue;

Review comment:
       Pretty minor nit but maybe move this above `convert_options.column_types...` so that `convert_options.column_types` doesn't include columns that are not in `convert_options.include_columns`.

##########
File path: cpp/src/arrow/dataset/file_csv_test.cc
##########
@@ -199,18 +210,11 @@ TEST_P(TestCsvFileFormat, OpenFailureWithRelevantError) {
   if (GetCompression() != Compression::type::UNCOMPRESSED) {
     GTEST_SKIP() << "File source name is different with compression";
   }
-  auto source = GetFileSource("");
-  EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"),
-                                  format_->Inspect(*source).status());
-
-  constexpr auto file_name = "herp/derp";
-  ASSERT_OK_AND_ASSIGN(
-      auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
-  EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name),
-                                  format_->Inspect({file_name, fs}).status());
+  TestOpenFailureWithRelevantError(format_.get(), StatusCode::Invalid);

Review comment:
       If you add a `GetFormat` to the `WriterMixin` you could probably save a lot of `format_.get()` arguments.

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {
+    EXPECT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema());
+    auto dataset = std::make_shared<FragmentDataset>(schema, FragmentVector{fragment});
+    ScannerBuilder builder(dataset, opts_);
+    ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
+    ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
+    return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
+                           std::move(batch_it));
+  }
+
+  // Scan the fragment directly, without using the scanner.
+  RecordBatchIterator PhysicalBatches(std::shared_ptr<Fragment> fragment) {
+    if (GetParam().use_async) {
+      EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
+      EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
+      return batch_it;
+    }
+    EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
+    return MakeFlattenIterator(MakeMaybeMapIterator(
+        [](std::shared_ptr<ScanTask> scan_task) { return scan_task->Execute(); },
+        std::move(scan_task_it)));
+  }
+
+  // Shared test cases
+  void TestScan(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = this->GetFileSource(reader.get());
+
+    this->SetSchema(reader->schema()->fields());
+    ASSERT_OK_AND_ASSIGN(auto fragment, format->MakeFragment(*source));
+
+    int64_t row_count = 0;
+    for (auto maybe_batch : Batches(fragment)) {
+      ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+      row_count += batch->num_rows();
+    }
+    ASSERT_EQ(row_count, GetParam().expected_rows());
+  }
+
+  void TestScanProjected(FileFormat* format) {

Review comment:
       If projection is applied by the scanner why are we testing it here?

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {
+    EXPECT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema());
+    auto dataset = std::make_shared<FragmentDataset>(schema, FragmentVector{fragment});
+    ScannerBuilder builder(dataset, opts_);
+    ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
+    ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));

Review comment:
       Probably no need to set `UseThreads` here, that will not affect how the `FileFormat` is interacted with.

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -303,6 +304,316 @@ template <typename P>
 class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
                                      public ::testing::WithParamInterface<P> {};
 
+struct TestScannerParams {
+  bool use_async;
+  bool use_threads;
+  int num_child_datasets;
+  int num_batches;
+  int items_per_batch;
+
+  int64_t total_batches() const { return num_child_datasets * num_batches; }
+
+  int64_t expected_rows() const { return total_batches() * items_per_batch; }
+
+  std::string ToString() const {
+    // GTest requires this to be alphanumeric
+    std::stringstream ss;
+    ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+       << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r";
+    return ss.str();
+  }
+
+  static std::string ToTestNameString(
+      const ::testing::TestParamInfo<TestScannerParams>& info) {
+    return std::to_string(info.index) + info.param.ToString();
+  }
+
+  static std::vector<TestScannerParams> Values() {
+    std::vector<TestScannerParams> values;
+    for (int sync = 0; sync < 2; sync++) {
+      for (int use_threads = 0; use_threads < 2; use_threads++) {
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 1, 1, 1024});
+        values.push_back(
+            {static_cast<bool>(sync), static_cast<bool>(use_threads), 2, 16, 1024});
+      }
+    }
+    return values;
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {
+  out << (params.use_async ? "async-" : "sync-")
+      << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets
+      << "d-" << params.num_batches << "b-" << params.items_per_batch << "i";
+  return out;
+}
+
+class FileFormatWriterMixin {
+  virtual std::shared_ptr<Buffer> Write(RecordBatchReader* reader) = 0;
+  virtual std::shared_ptr<Buffer> Write(const Table& table) = 0;
+};
+
+/// WriterMixin should be a class with these static methods:
+/// std::shared_ptr<Buffer> Write(RecordBatchReader* reader);
+template <typename WriterMixin>
+class FileFormatFixtureMixin : public ::testing::Test {
+ public:
+  constexpr static int64_t kBatchSize = 1UL << 12;
+  constexpr static int64_t kBatchRepetitions = 1 << 5;
+
+  int64_t expected_batches() const { return kBatchRepetitions; }
+  int64_t expected_rows() const { return kBatchSize * kBatchRepetitions; }
+
+  std::shared_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
+    auto buffer = WriterMixin::Write(reader);
+    return std::make_shared<FileSource>(std::move(buffer));
+  }
+
+  virtual std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) {
+    return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions);
+  }
+
+  Result<std::shared_ptr<io::BufferOutputStream>> GetFileSink() {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
+                          AllocateResizableBuffer(0));
+    return std::make_shared<io::BufferOutputStream>(buffer);
+  }
+
+  void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
+    opts_ = std::make_shared<ScanOptions>();
+    opts_->dataset_schema = schema(std::move(fields));
+    ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names()));
+  }
+
+  void SetFilter(Expression filter) {
+    ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
+  }
+
+  void Project(std::vector<std::string> names) {
+    ASSERT_OK(SetProjection(opts_.get(), std::move(names)));
+  }
+
+  // Shared test cases
+  void TestOpenFailureWithRelevantError(FileFormat* format, StatusCode code) {
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    auto result = format->Inspect(FileSource(buf));
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr("<Buffer>"));
+
+    constexpr auto file_name = "herp/derp";
+    ASSERT_OK_AND_ASSIGN(
+        auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
+    result = format->Inspect({file_name, fs});
+    EXPECT_FALSE(result.ok());
+    EXPECT_EQ(code, result.status().code());
+    EXPECT_THAT(result.status().ToString(), testing::HasSubstr(file_name));
+  }
+  void TestInspect(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, format->Inspect(*source.get()));
+    AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false);
+  }
+  void TestIsSupported(FileFormat* format) {
+    auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+    auto source = GetFileSource(reader.get());
+
+    bool supported = false;
+
+    std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    buf = std::make_shared<Buffer>(util::string_view("corrupted"));
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(FileSource(buf)));
+    ASSERT_EQ(supported, false);
+
+    ASSERT_OK_AND_ASSIGN(supported, format->IsSupported(*source));
+    EXPECT_EQ(supported, true);
+  }
+  std::shared_ptr<Buffer> TestWrite(FileFormat* format, std::shared_ptr<Schema> schema,
+                                    std::shared_ptr<FileWriteOptions> options = nullptr) {
+    SetSchema(schema->fields());
+    EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+    if (!options) options = format->DefaultWriteOptions();
+    EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
+    ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
+    ARROW_EXPECT_OK(writer->Finish());
+    EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+    return written;
+  }
+
+ protected:
+  std::shared_ptr<ScanOptions> opts_ = std::make_shared<ScanOptions>();
+};
+
+template <typename Writer>
+class FileFormatScanMixin : public FileFormatFixtureMixin<Writer>,
+                            public ::testing::WithParamInterface<TestScannerParams> {
+ public:
+  int64_t expected_batches() const { return GetParam().total_batches(); }
+  int64_t expected_rows() const { return GetParam().expected_rows(); }
+
+  std::shared_ptr<RecordBatchReader> GetRecordBatchReader(
+      std::shared_ptr<Schema> schema) override {
+    return MakeGeneratedRecordBatch(schema, GetParam().items_per_batch,
+                                    GetParam().total_batches());
+  }
+
+  // Scan the fragment through the scanner.
+  RecordBatchIterator Batches(std::shared_ptr<Fragment> fragment) {

Review comment:
       What does this add to `FileFormat` testing above and beyond `ScanBatches`?  One could argue that `PhysicalBatches` is all you need.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org