You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/01 09:20:03 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #9620: ARROW-11843: [C++] Provide reentrant Parquet reader

pitrou commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r642908613



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -829,6 +829,9 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
   return out;
 }
 
+template <>
+inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}

Review comment:
       Hmm... can you explain what the point of this is? Also, why isn't the constructor defined directly at its declaration point in the class?

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -289,21 +358,94 @@ class SerializedFile : public ParquetFileReader::Contents {
           "Parquet magic bytes not found in footer. Either the file is corrupted or this "
           "is not a parquet file.");
     }
+    // Both encrypted/unencrypted footers have the same footer length check.
+    uint32_t metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
+        reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
+        kFooterSize);
+    if (metadata_len > source_size_ - kFooterSize) {
+      throw ParquetInvalidOrCorruptedFileException(
+          "Parquet file size is ", source_size_,
+          " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)");
+    }
+    return metadata_len;
+  }
+
+  // Does not throw.
+  ::arrow::Future<> ParseMetaDataAsync() {
+    int64_t footer_read_size;
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    footer_read_size = GetFooterReadSize();
+    END_PARQUET_CATCH_EXCEPTIONS
+    // Assumes this is kept alive externally
+    return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
+        .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+                  -> ::arrow::Future<> {
+          uint32_t metadata_len;
+          BEGIN_PARQUET_CATCH_EXCEPTIONS
+          metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+          END_PARQUET_CATCH_EXCEPTIONS
+          int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+          std::shared_ptr<::arrow::Buffer> metadata_buffer;
+          if (footer_read_size >= (metadata_len + kFooterSize)) {
+            metadata_buffer =
+                SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize,
+                            metadata_len);
+            return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
+                                                    std::move(metadata_buffer),
+                                                    footer_read_size, metadata_len);
+          }
+          return source_->ReadAsync(metadata_start, metadata_len)
+              .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+                return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer,
+                                                        footer_read_size, metadata_len);
+              });
+        });
+  }
 
-    if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
+  // Continuation
+  ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
+      std::shared_ptr<::arrow::Buffer> footer_buffer,
+      std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size,
+      uint32_t metadata_len) {
+    // Parse the footer depending on encryption type
+    const bool is_encrypted_footer =
+        memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0;
+    if (is_encrypted_footer) {
       // Encrypted file with Encrypted footer.
-      ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
-      return;
+      std::pair<int64_t, uint32_t> read_size;
+      BEGIN_PARQUET_CATCH_EXCEPTIONS
+      read_size =
+          ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len);
+      END_PARQUET_CATCH_EXCEPTIONS
+      // Read the actual footer
+      int64_t metadata_start = read_size.first;
+      metadata_len = read_size.second;
+      return source_->ReadAsync(metadata_start, metadata_len)
+          .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+            // Continue and read the file footer
+            return ParseMetaDataAsync(std::move(footer_buffer), metadata_buffer,
+                                      footer_read_size, metadata_len,
+                                      is_encrypted_footer);
+          });
     }
+    return ParseMetaDataAsync(std::move(footer_buffer), std::move(metadata_buffer),
+                              footer_read_size, metadata_len, is_encrypted_footer);
+  }
 
-    // No encryption or encryption with plaintext footer mode.
-    std::shared_ptr<Buffer> metadata_buffer;
-    uint32_t metadata_len, read_metadata_len;
-    ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
-                                 &metadata_len, &read_metadata_len);
-
+  // Continuation
+  ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer> footer_buffer,

Review comment:
       This is not async, rename this function? For example `ParseMetadataBuffer`.

##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -21,6 +21,8 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/util/async_generator.h"

Review comment:
       This is a heavy include, can we avoid adding it?

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2331,6 +2330,63 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
   ASSERT_EQ(actual_batch->num_rows(), num_rows);
 }
 
+TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
+  ArrowReaderProperties properties = default_arrow_reader_properties();
+  const int num_rows = 1024;
+  const int row_group_size = 512;
+  const int num_columns = 2;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             default_arrow_writer_properties(), &buffer));
+
+  std::shared_ptr<FileReader> reader;
+  {
+    std::unique_ptr<FileReader> unique_reader;
+    FileReaderBuilder builder;
+    ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
+    ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+    reader = std::move(unique_reader);
+  }
+
+  auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
+                          int num_columns, int num_rows) {
+    ASSERT_NE(batch, nullptr);
+    ASSERT_EQ(batch->num_columns(), num_columns);
+    ASSERT_EQ(batch->num_rows(), num_rows);

Review comment:
       This doesn't seem to actually check the contents read from file. Could you do that?

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -563,6 +692,28 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
   return Open(std::move(source), props, std::move(metadata));
 }
 
+::arrow::Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync(
+    std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
+    std::shared_ptr<FileMetaData> metadata) {
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+  auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata));
+  // TODO(ARROW-12259): workaround since we have Future<(move-only type)>

Review comment:
       I don't understand this comment. Where is the workaround?

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -264,23 +264,92 @@ class SerializedFile : public ParquetFileReader::Contents {
       }
     }
     PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
-    return cached_source_->Wait();
   }
 
+  ::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
+                                 const std::vector<int>& column_indices) const {
+    if (!cached_source_) {
+      return ::arrow::Status::Invalid("Must call PreBuffer before WhenBuffered");
+    }
+    std::vector<::arrow::io::ReadRange> ranges;
+    for (int row : row_groups) {
+      for (int col : column_indices) {
+        ranges.push_back(
+            ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col));
+      }
+    }
+    return cached_source_->WaitFor(ranges);
+  }
+
+  // Metadata/footer parsing. Divided up to separate sync/async paths, and to use
+  // exceptions for error handling (with the async path converting to Future/Status).
+
   void ParseMetaData() {

Review comment:
       Can we express this in terms of `ParseMetadataAsync`? I would rather not have duplicate code paths for this.

##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -175,6 +177,19 @@ class PARQUET_EXPORT FileReader {
       const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
       std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
 
+  /// \brief Return a generator of record batches.
+  ///
+  /// The FileReader must outlive the generator, so this requires that you pass in a
+  /// shared_ptr.
+  ///
+  /// \returns error Result if either row_group_indices or column_indices contains an
+  ///     invalid index
+  virtual ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+  GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+                          const std::vector<int> row_group_indices,
+                          const std::vector<int> column_indices,
+                          ::arrow::internal::Executor* executor = NULLPTR) = 0;

Review comment:
       Should explain whether the executor is meant for IO or CPU work.

##########
File path: cpp/src/parquet/encryption/test_encryption_util.cc
##########
@@ -334,7 +335,10 @@ void FileDecryptor::DecryptFile(
     reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
   }
 
-  auto file_reader = parquet::ParquetFileReader::OpenFile(file, false, reader_properties);
+  std::shared_ptr<::arrow::io::RandomAccessFile> source;
+  PARQUET_ASSIGN_OR_THROW(
+      source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool()));

Review comment:
       For the record, why did you need to change this?

##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -21,6 +21,8 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/util/async_generator.h"
+#include "arrow/util/optional.h"

Review comment:
       Is this being used?

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -325,10 +469,9 @@ class SerializedFile : public ParquetFileReader::Contents {
 
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 
-  void ParseUnencryptedFileMetadata(const std::shared_ptr<Buffer>& footer_buffer,
-                                    int64_t footer_read_size,
-                                    std::shared_ptr<Buffer>* metadata_buffer,
-                                    uint32_t* metadata_len, uint32_t* read_metadata_len);
+  // \return The true length of the metadata

Review comment:
       Length in bytes?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using RecordBatchGenerator =
+      ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> column_indices)
+      : arrow_reader_(std::move(arrow_reader)),
+        executor_(executor),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)),
+        index_(0) {}
+
+  ::arrow::Future<RecordBatchGenerator> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
+    }
+    int row_group = row_groups_[index_++];
+    std::vector<int> column_indices = column_indices_;
+    auto reader = arrow_reader_;
+    if (!reader->properties().pre_buffer()) {
+      return SubmitRead(executor_, reader, row_group, column_indices);
+    }
+    auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
+    if (executor_) ready = executor_->Transfer(ready);
+    return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {
+      return ReadOneRowGroup(reader, row_group, column_indices);
+    });
+  }
+
+ private:
+  // Synchronous fallback for when pre-buffer isn't enabled.
+  //
+  // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
+  // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
+  // async I/O without forcing readahead.
+  static ::arrow::Future<RecordBatchGenerator> SubmitRead(
+      ::arrow::internal::Executor* executor, std::shared_ptr<FileReaderImpl> self,
+      const int row_group, const std::vector<int>& column_indices) {
+    if (!executor) {
+      return Future<RecordBatchGenerator>::MakeFinished(
+          ReadOneRowGroup(self, row_group, column_indices));
+    }
+    // If we have an executor, then force transfer (even if I/O was complete)
+    return ::arrow::DeferNotOk(
+        executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+  }
+
+  static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
+      std::shared_ptr<FileReaderImpl> self, const int row_group,
+      const std::vector<int>& column_indices) {
+    std::shared_ptr<::arrow::Table> table;
+    // Skips bound checks/pre-buffering, since we've done that already
+    RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
+    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+    ::arrow::RecordBatchVector batches;
+    while (true) {
+      std::shared_ptr<::arrow::RecordBatch> batch;
+      RETURN_NOT_OK(table_reader->ReadNext(&batch));
+      if (!batch) {
+        break;
+      }
+      batches.push_back(batch);
+    }
+    return ::arrow::MakeVectorGenerator(std::move(batches));
+  }
+
+  std::shared_ptr<FileReaderImpl> arrow_reader_;
+  ::arrow::internal::Executor* executor_;
+  std::vector<int> row_groups_;
+  std::vector<int> column_indices_;
+  size_t index_;
+};
+
+::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+                                        const std::vector<int> row_group_indices,
+                                        const std::vector<int> column_indices,
+                                        ::arrow::internal::Executor* executor) {
+  RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
+  if (reader_properties_.pre_buffer()) {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
+                       reader_properties_.cache_options());
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+  ::arrow::AsyncGenerator<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>

Review comment:
       Use `auto`?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using RecordBatchGenerator =
+      ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> column_indices)
+      : arrow_reader_(std::move(arrow_reader)),
+        executor_(executor),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)),
+        index_(0) {}
+
+  ::arrow::Future<RecordBatchGenerator> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
+    }
+    int row_group = row_groups_[index_++];
+    std::vector<int> column_indices = column_indices_;
+    auto reader = arrow_reader_;
+    if (!reader->properties().pre_buffer()) {
+      return SubmitRead(executor_, reader, row_group, column_indices);
+    }
+    auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
+    if (executor_) ready = executor_->Transfer(ready);
+    return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {

Review comment:
       Can you open a JIRA to reuse @westonpace 's work to always transfer the future? (we don't want CPU-heavy Parquet decoding to happen on an IO thread)

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -289,21 +358,94 @@ class SerializedFile : public ParquetFileReader::Contents {
           "Parquet magic bytes not found in footer. Either the file is corrupted or this "
           "is not a parquet file.");
     }
+    // Both encrypted/unencrypted footers have the same footer length check.
+    uint32_t metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
+        reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
+        kFooterSize);
+    if (metadata_len > source_size_ - kFooterSize) {
+      throw ParquetInvalidOrCorruptedFileException(
+          "Parquet file size is ", source_size_,
+          " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)");
+    }
+    return metadata_len;
+  }
+
+  // Does not throw.
+  ::arrow::Future<> ParseMetaDataAsync() {
+    int64_t footer_read_size;
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    footer_read_size = GetFooterReadSize();
+    END_PARQUET_CATCH_EXCEPTIONS
+    // Assumes this is kept alive externally
+    return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
+        .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+                  -> ::arrow::Future<> {
+          uint32_t metadata_len;
+          BEGIN_PARQUET_CATCH_EXCEPTIONS
+          metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+          END_PARQUET_CATCH_EXCEPTIONS
+          int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+          std::shared_ptr<::arrow::Buffer> metadata_buffer;
+          if (footer_read_size >= (metadata_len + kFooterSize)) {
+            metadata_buffer =
+                SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize,
+                            metadata_len);
+            return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
+                                                    std::move(metadata_buffer),
+                                                    footer_read_size, metadata_len);
+          }
+          return source_->ReadAsync(metadata_start, metadata_len)
+              .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+                return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer,
+                                                        footer_read_size, metadata_len);
+              });
+        });
+  }
 
-    if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
+  // Continuation
+  ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
+      std::shared_ptr<::arrow::Buffer> footer_buffer,
+      std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size,
+      uint32_t metadata_len) {
+    // Parse the footer depending on encryption type
+    const bool is_encrypted_footer =
+        memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0;
+    if (is_encrypted_footer) {
       // Encrypted file with Encrypted footer.
-      ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
-      return;
+      std::pair<int64_t, uint32_t> read_size;
+      BEGIN_PARQUET_CATCH_EXCEPTIONS
+      read_size =
+          ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len);
+      END_PARQUET_CATCH_EXCEPTIONS
+      // Read the actual footer
+      int64_t metadata_start = read_size.first;
+      metadata_len = read_size.second;
+      return source_->ReadAsync(metadata_start, metadata_len)
+          .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+            // Continue and read the file footer
+            return ParseMetaDataAsync(std::move(footer_buffer), metadata_buffer,
+                                      footer_read_size, metadata_len,
+                                      is_encrypted_footer);
+          });
     }
+    return ParseMetaDataAsync(std::move(footer_buffer), std::move(metadata_buffer),
+                              footer_read_size, metadata_len, is_encrypted_footer);
+  }
 
-    // No encryption or encryption with plaintext footer mode.
-    std::shared_ptr<Buffer> metadata_buffer;
-    uint32_t metadata_len, read_metadata_len;
-    ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
-                                 &metadata_len, &read_metadata_len);
-
+  // Continuation
+  ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer> footer_buffer,
+                                     std::shared_ptr<::arrow::Buffer> metadata_buffer,
+                                     int64_t footer_read_size, uint32_t metadata_len,
+                                     const bool is_encrypted_footer) {

Review comment:
       `footer_buffer` and `footer_read_size` don't seem used here?

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2331,6 +2330,63 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
   ASSERT_EQ(actual_batch->num_rows(), num_rows);
 }
 
+TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
+  ArrowReaderProperties properties = default_arrow_reader_properties();
+  const int num_rows = 1024;
+  const int row_group_size = 512;
+  const int num_columns = 2;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             default_arrow_writer_properties(), &buffer));
+
+  std::shared_ptr<FileReader> reader;
+  {
+    std::unique_ptr<FileReader> unique_reader;
+    FileReaderBuilder builder;
+    ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
+    ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+    reader = std::move(unique_reader);
+  }
+
+  auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
+                          int num_columns, int num_rows) {
+    ASSERT_NE(batch, nullptr);
+    ASSERT_EQ(batch->num_columns(), num_columns);
+    ASSERT_EQ(batch->num_rows(), num_rows);
+  };
+  {
+    ASSERT_OK_AND_ASSIGN(auto batch_generator,
+                         reader->GetRecordBatchGenerator(reader, {0, 1}, {0, 1}));
+    auto fut1 = batch_generator();
+    auto fut2 = batch_generator();
+    auto fut3 = batch_generator();
+    ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
+    ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
+    ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
+    ASSERT_EQ(batch3, nullptr);
+    check_batches(batch1, num_columns, row_group_size);
+    check_batches(batch2, num_columns, row_group_size);
+  }
+  {
+    // No columns case
+    ASSERT_OK_AND_ASSIGN(auto batch_generator,
+                         reader->GetRecordBatchGenerator(reader, {0, 1}, {}));

Review comment:
       So you always have to pass columns explicitly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org