You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/15 17:12:37 UTC
[arrow-rs] branch master updated: Add more tests of RecordReader Batch Size Edge Cases (#2025) (#2032)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a7181ddbd Add more tests of RecordReader Batch Size Edge Cases (#2025) (#2032)
a7181ddbd is described below
commit a7181ddbda801ba021398fb264cda394d64bff85
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jul 15 13:12:32 2022 -0400
Add more tests of RecordReader Batch Size Edge Cases (#2025) (#2032)
* Add more tests of #2025
* Update parquet/src/arrow/arrow_reader.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
parquet/src/arrow/arrow_reader.rs | 35 +++++++++++++++++++++++++---------
parquet/src/arrow/record_reader/mod.rs | 3 ++-
2 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index f93488f75..ebbb864d6 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -1529,8 +1529,7 @@ mod tests {
assert_eq!(total_rows, expected_rows);
}
- #[test]
- fn test_row_group_exact_multiple() {
+ fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
let schema = Arc::new(Schema::new(vec![Field::new(
"list",
ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Int32, true))),
@@ -1544,14 +1543,14 @@ mod tests {
schema.clone(),
Some(
WriterProperties::builder()
- .set_max_row_group_size(8)
+ .set_max_row_group_size(row_group_size)
.build(),
),
)
.unwrap();
for _ in 0..2 {
- let mut list_builder = ListBuilder::new(Int32Builder::new(10));
- for _ in 0..10 {
+ let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size));
+ for _ in 0..(batch_size) {
list_builder.append(true).unwrap();
}
let batch = RecordBatch::try_new(
@@ -1564,9 +1563,27 @@ mod tests {
writer.close().unwrap();
let mut file_reader = ParquetFileArrowReader::try_new(Bytes::from(buf)).unwrap();
- let mut record_reader = file_reader.get_record_reader(8).unwrap();
- assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
- assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
- assert_eq!(4, record_reader.next().unwrap().unwrap().num_rows());
+ let mut record_reader = file_reader.get_record_reader(batch_size).unwrap();
+ assert_eq!(
+ batch_size,
+ record_reader.next().unwrap().unwrap().num_rows()
+ );
+ assert_eq!(
+ batch_size,
+ record_reader.next().unwrap().unwrap().num_rows()
+ );
+ }
+
+ #[test]
+ fn test_row_group_exact_multiple() {
+ use crate::arrow::record_reader::MIN_BATCH_SIZE;
+ test_row_group_batch(8, 8);
+ test_row_group_batch(10, 8);
+ test_row_group_batch(8, 10);
+ test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE);
+ test_row_group_batch(MIN_BATCH_SIZE + 1, MIN_BATCH_SIZE);
+ test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE + 1);
+ test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
+ test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}
}
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index 30324fbe3..d2720aede 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -38,7 +38,8 @@ use crate::schema::types::ColumnDescPtr;
pub(crate) mod buffer;
mod definition_levels;
-const MIN_BATCH_SIZE: usize = 1024;
+/// The minimum number of levels read when reading a repeated field
+pub(crate) const MIN_BATCH_SIZE: usize = 1024;
/// A `RecordReader` is a stateful column reader that delimits semantic records.
pub type RecordReader<T> =