You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:28:07 UTC
[arrow-rs] branch master updated: Avoid large over allocate buffer in async reader (#2537)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 98de2e3cc Avoid large over allocate buffer in async reader (#2537)
98de2e3cc is described below
commit 98de2e3ccc9f74760de1edf7dc1f4e1a820f9acf
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat Aug 20 22:28:02 2022 +0800
Avoid large over allocate buffer in async reader (#2537)
---
parquet/src/arrow/arrow_reader/mod.rs | 4 ++++
parquet/src/arrow/async_reader.rs | 36 ++++++++++++++++++++++++++++++++++-
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 74fff9935..b7c2db255 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -115,7 +115,11 @@ impl<T> ArrowReaderBuilder<T> {
}
/// Set the size of [`RecordBatch`] to produce. Defaults to 1024
+ /// If the batch_size more than the file row count, use the file row count.
pub fn with_batch_size(self, batch_size: usize) -> Self {
+ // Try to avoid allocate large buffer
+ let batch_size =
+ batch_size.min(self.metadata.file_metadata().num_rows() as usize);
Self { batch_size, ..self }
}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index b0d9143d6..201f2afcf 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -329,6 +329,10 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
None => (0..self.metadata.row_groups().len()).collect(),
};
+ // Try to avoid allocate large buffer
+ let batch_size = self
+ .batch_size
+ .min(self.metadata.file_metadata().num_rows() as usize);
let reader = ReaderFactory {
input: self.input.0,
filter: self.filter,
@@ -338,7 +342,7 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Ok(ParquetRecordBatchStream {
metadata: self.metadata,
- batch_size: self.batch_size,
+ batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
@@ -1133,4 +1137,34 @@ mod tests {
assert_eq!(&requests[..], &expected_page_requests)
}
+
+ #[tokio::test]
+ async fn test_batch_size_overallocate() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ // `alltypes_plain.parquet` only have 8 rows
+ let path = format!("{}/alltypes_plain.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = parse_metadata(&data).unwrap();
+ let file_rows = metadata.file_metadata().num_rows() as usize;
+ let metadata = Arc::new(metadata);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+
+ let stream = builder
+ .with_projection(ProjectionMask::all())
+ .with_batch_size(1024)
+ .build()
+ .unwrap();
+ assert_ne!(1024, file_rows);
+ assert_eq!(stream.batch_size, file_rows as usize);
+ }
}