You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 12:13:20 UTC
[arrow-rs] branch master updated: Don't split record across pages (#3680) (#4327)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 23607fe66 Don't split record across pages (#3680) (#4327)
23607fe66 is described below
commit 23607fe669da1939eedfb043e8bc5ade657cfee0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 13:13:14 2023 +0100
Don't split record across pages (#3680) (#4327)
---
parquet/src/column/writer/mod.rs | 64 +++++++++++++++++++-----------------
parquet/tests/arrow_writer_layout.rs | 43 ++++++++++++++++++++++++
2 files changed, 77 insertions(+), 30 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 310519f4a..fc5e29b03 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -308,6 +308,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize> {
+ // Check if number of definition levels is the same as number of repetition levels.
+ if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
+ if def.len() != rep.len() {
+ return Err(general_err!(
+ "Inconsistent length of definition and repetition levels: {} != {}",
+ def.len(),
+ rep.len()
+ ));
+ }
+ }
+
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be well above the limit.
//
@@ -323,10 +334,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None => values.len(),
};
- // Find out number of batches to process.
- let write_batch_size = self.props.write_batch_size();
- let num_batches = num_levels / write_batch_size;
-
// If only computing chunk-level statistics compute them here, page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
// [`Self::add_data_page`]
@@ -374,27 +381,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let mut values_offset = 0;
let mut levels_offset = 0;
- for _ in 0..num_batches {
+ let base_batch_size = self.props.write_batch_size();
+ while levels_offset < num_levels {
+ let mut end_offset = num_levels.min(levels_offset + base_batch_size);
+
+ // Split at record boundary
+ if let Some(r) = rep_levels {
+ while end_offset < r.len() && r[end_offset] != 0 {
+ end_offset += 1;
+ }
+ }
+
values_offset += self.write_mini_batch(
values,
values_offset,
value_indices,
- write_batch_size,
- def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
- rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
+ end_offset - levels_offset,
+ def_levels.map(|lv| &lv[levels_offset..end_offset]),
+ rep_levels.map(|lv| &lv[levels_offset..end_offset]),
)?;
- levels_offset += write_batch_size;
+ levels_offset = end_offset;
}
- values_offset += self.write_mini_batch(
- values,
- values_offset,
- value_indices,
- num_levels - levels_offset,
- def_levels.map(|lv| &lv[levels_offset..]),
- rep_levels.map(|lv| &lv[levels_offset..]),
- )?;
-
// Return total number of values processed.
Ok(values_offset)
}
@@ -522,18 +530,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
- // Check if number of definition levels is the same as number of repetition
- // levels.
- if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
- if def.len() != rep.len() {
- return Err(general_err!(
- "Inconsistent length of definition and repetition levels: {} != {}",
- def.len(),
- rep.len()
- ));
- }
- }
-
// Process definition levels and determine how many values to write.
let values_to_write = if self.descr.max_def_level() > 0 {
let levels = def_levels.ok_or_else(|| {
@@ -569,6 +565,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
)
})?;
+ if !levels.is_empty() && levels[0] != 0 {
+ return Err(general_err!(
+ "Write must start at a record boundary, got non-zero repetition level of {}",
+ levels[0]
+ ));
+ }
+
// Count the occasions where we start a new row
for &level in levels {
self.page_metrics.num_buffered_rows += (level == 0) as u32
@@ -2255,6 +2258,7 @@ mod tests {
let mut buf: Vec<i16> = Vec::new();
let rep_levels = if max_rep_level > 0 {
random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
+ buf[0] = 0; // Must start on record boundary
Some(&buf[..])
} else {
None
diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs
index 142112b7b..3142c8c52 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -19,6 +19,7 @@
use arrow::array::{Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{Int32Builder, ListBuilder};
use bytes::Bytes;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
@@ -502,3 +503,45 @@ fn test_string() {
},
});
}
+
+#[test]
+fn test_list() {
+ let mut list = ListBuilder::new(Int32Builder::new());
+ for _ in 0..200 {
+ let values = list.values();
+ for i in 0..8 {
+ values.append_value(i);
+ }
+ list.append(true);
+ }
+ let array = Arc::new(list.finish()) as _;
+
+ let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+ let props = WriterProperties::builder()
+ .set_dictionary_enabled(false)
+ .set_data_page_row_count_limit(20)
+ .set_write_batch_size(3)
+ .build();
+
+ // Test rows not split across pages
+ do_test(LayoutTest {
+ props,
+ batches: vec![batch],
+ layout: Layout {
+ row_groups: vec![RowGroup {
+ columns: vec![ColumnChunk {
+ pages: (0..10)
+ .map(|_| Page {
+ rows: 20,
+ page_header_size: 34,
+ compressed_size: 672,
+ encoding: Encoding::PLAIN,
+ page_type: PageType::DATA_PAGE,
+ })
+ .collect(),
+ dictionary_page: None,
+ }],
+ }],
+ },
+ });
+}