You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/14 14:00:11 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #11616: ARROW-14577: [C++] Enable fine grained IO for async IPC reader

lidavidm commented on a change in pull request #11616:
URL: https://github.com/apache/arrow/pull/11616#discussion_r784863857



##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -2715,6 +2727,133 @@ TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) {
   GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4});
 }
 
+constexpr static int kNumBatches = 10;
+// It can be difficult to know the exact size of the schema.  Instead we just make the
+// row data big enough that we can easily identify if a read is for a schema or for
+// row data.
+//
+// This needs to be large enough to space record batches kDefaultHoleSizeLimit bytes apart
+// and also large enough that record batch data is more than kMaxMetadataSizeBytes bytes
+constexpr static int kRowsPerBatch = 1000;
+constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13;
+// There are always 2 reads when the file is opened
+constexpr static int kNumReadsOnOpen = 2;
+
+class PreBufferingTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override {
+    file_buffer_ = MakeBooleanInt32Int64File(kRowsPerBatch, kNumBatches);
+  }
+
+  void OpenReader() {
+    buffer_reader_ = make_unique<io::BufferReader>(file_buffer_);
+    tracked_ = std::make_shared<TrackedRandomAccessFile>(buffer_reader_.get());
+    auto read_options = IpcReadOptions::Defaults();
+    if (ReadsArePlugged()) {
+      // This will ensure that all reads get globbed together into one large read
+      read_options.pre_buffer_cache_options.hole_size_limit =
+          std::numeric_limits<int64_t>::max() - 1;
+      read_options.pre_buffer_cache_options.range_size_limit =
+          std::numeric_limits<int64_t>::max();
+    }
+    ASSERT_OK_AND_ASSIGN(reader_, RecordBatchFileReader::Open(tracked_, read_options));
+  }
+
+  bool ReadsArePlugged() { return GetParam(); }
+
+  std::vector<int> AllBatchIndices() {
+    std::vector<int> all_batch_indices(kNumBatches);
+    std::iota(all_batch_indices.begin(), all_batch_indices.end(), 0);
+    return all_batch_indices;
+  }
+
+  void AssertMetadataLoaded(std::vector<int> batch_indices) {
+    if (batch_indices.size() == 0) {
+      batch_indices = AllBatchIndices();
+    }
+    const auto& read_ranges = tracked_->get_read_ranges();
+    if (ReadsArePlugged()) {
+      // The read should have arrived as one large read
+      ASSERT_EQ(kNumReadsOnOpen + 1, read_ranges.size());
+      if (batch_indices.size() > 1) {
+        ASSERT_GT(read_ranges[kNumReadsOnOpen].length, kMaxMetadataSizeBytes);
+      }
+    } else {
+      // We should get many small reads of metadata only
+      ASSERT_EQ(batch_indices.size() + kNumReadsOnOpen, read_ranges.size());
+      for (const auto& read_range : read_ranges) {
+        ASSERT_LT(read_range.length, kMaxMetadataSizeBytes);
+      }
+    }
+  }
+
+  std::vector<std::shared_ptr<RecordBatch>> LoadExpected() {
+    auto buffer_reader = make_unique<io::BufferReader>(file_buffer_);
+    auto read_options = IpcReadOptions::Defaults();
+    EXPECT_OK_AND_ASSIGN(auto reader,
+                         RecordBatchFileReader::Open(buffer_reader.get(), read_options));
+    std::vector<std::shared_ptr<RecordBatch>> expected_batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i));
+      expected_batches.push_back(expected_batch);
+    }
+    return expected_batches;
+  }
+
+  void CheckFileRead(int num_indices_pre_buffered) {
+    auto expected_batches = LoadExpected();
+    const std::vector<io::ReadRange>& read_ranges = tracked_->get_read_ranges();
+    std::size_t starting_reads = read_ranges.size();
+    for (int i = 0; i < reader_->num_record_batches(); i++) {
+      ASSERT_OK_AND_ASSIGN(auto next_batch, reader_->ReadRecordBatch(i));
+      AssertBatchesEqual(*expected_batches[i], *next_batch);
+    }
+    int metadata_reads = 0;
+    int data_reads = 0;
+    for (std::size_t i = starting_reads; i < read_ranges.size(); i++) {
+      if (read_ranges[i].length > kMaxMetadataSizeBytes) {

Review comment:
       It'd be good to revisit this in the context of also adding column/batch statistics for filter pushdown as well. Though at that point we'd basically be Parquet/ORC without any of the fancy data encodings (which might be exactly what we're after).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org