You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:28:07 UTC

[arrow-rs] branch master updated: Avoid large over allocate buffer in async reader (#2537)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 98de2e3cc Avoid large over allocate buffer in async reader (#2537)
98de2e3cc is described below

commit 98de2e3ccc9f74760de1edf7dc1f4e1a820f9acf
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat Aug 20 22:28:02 2022 +0800

    Avoid large over allocate buffer in async reader (#2537)
---
 parquet/src/arrow/arrow_reader/mod.rs |  4 ++++
 parquet/src/arrow/async_reader.rs     | 36 ++++++++++++++++++++++++++++++++++-
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 74fff9935..b7c2db255 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -115,7 +115,11 @@ impl<T> ArrowReaderBuilder<T> {
     }
 
     /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
+    /// If the batch_size more than the file row count, use the file row count.
     pub fn with_batch_size(self, batch_size: usize) -> Self {
+        // Try to avoid allocate large buffer
+        let batch_size =
+            batch_size.min(self.metadata.file_metadata().num_rows() as usize);
         Self { batch_size, ..self }
     }
 
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index b0d9143d6..201f2afcf 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -329,6 +329,10 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
             None => (0..self.metadata.row_groups().len()).collect(),
         };
 
+        // Try to avoid allocate large buffer
+        let batch_size = self
+            .batch_size
+            .min(self.metadata.file_metadata().num_rows() as usize);
         let reader = ReaderFactory {
             input: self.input.0,
             filter: self.filter,
@@ -338,7 +342,7 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
 
         Ok(ParquetRecordBatchStream {
             metadata: self.metadata,
-            batch_size: self.batch_size,
+            batch_size,
             row_groups,
             projection: self.projection,
             selection: self.selection,
@@ -1133,4 +1137,34 @@ mod tests {
 
         assert_eq!(&requests[..], &expected_page_requests)
     }
+
+    #[tokio::test]
+    async fn test_batch_size_overallocate() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        // `alltypes_plain.parquet` only have 8 rows
+        let path = format!("{}/alltypes_plain.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = parse_metadata(&data).unwrap();
+        let file_rows = metadata.file_metadata().num_rows() as usize;
+        let metadata = Arc::new(metadata);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+            .await
+            .unwrap();
+
+        let stream = builder
+            .with_projection(ProjectionMask::all())
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+        assert_ne!(1024, file_rows);
+        assert_eq!(stream.batch_size, file_rows as usize);
+    }
 }