You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 12:13:20 UTC

[arrow-rs] branch master updated: Don't split record across pages (#3680) (#4327)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 23607fe66 Don't split record across pages (#3680) (#4327)
23607fe66 is described below

commit 23607fe669da1939eedfb043e8bc5ade657cfee0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 13:13:14 2023 +0100

    Don't split record across pages (#3680) (#4327)
---
 parquet/src/column/writer/mod.rs     | 64 +++++++++++++++++++-----------------
 parquet/tests/arrow_writer_layout.rs | 43 ++++++++++++++++++++++++
 2 files changed, 77 insertions(+), 30 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 310519f4a..fc5e29b03 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -308,6 +308,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         max: Option<&E::T>,
         distinct_count: Option<u64>,
     ) -> Result<usize> {
+        // Check if number of definition levels is the same as number of repetition levels.
+        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
+            if def.len() != rep.len() {
+                return Err(general_err!(
+                    "Inconsistent length of definition and repetition levels: {} != {}",
+                    def.len(),
+                    rep.len()
+                ));
+            }
+        }
+
         // We check for DataPage limits only after we have inserted the values. If a user
         // writes a large number of values, the DataPage size can be well above the limit.
         //
@@ -323,10 +334,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             None => values.len(),
         };
 
-        // Find out number of batches to process.
-        let write_batch_size = self.props.write_batch_size();
-        let num_batches = num_levels / write_batch_size;
-
         // If only computing chunk-level statistics compute them here, page-level statistics
         // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
         // [`Self::add_data_page`]
@@ -374,27 +381,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
         let mut values_offset = 0;
         let mut levels_offset = 0;
-        for _ in 0..num_batches {
+        let base_batch_size = self.props.write_batch_size();
+        while levels_offset < num_levels {
+            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
+
+            // Split at record boundary
+            if let Some(r) = rep_levels {
+                while end_offset < r.len() && r[end_offset] != 0 {
+                    end_offset += 1;
+                }
+            }
+
             values_offset += self.write_mini_batch(
                 values,
                 values_offset,
                 value_indices,
-                write_batch_size,
-                def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
-                rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
+                end_offset - levels_offset,
+                def_levels.map(|lv| &lv[levels_offset..end_offset]),
+                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
             )?;
-            levels_offset += write_batch_size;
+            levels_offset = end_offset;
         }
 
-        values_offset += self.write_mini_batch(
-            values,
-            values_offset,
-            value_indices,
-            num_levels - levels_offset,
-            def_levels.map(|lv| &lv[levels_offset..]),
-            rep_levels.map(|lv| &lv[levels_offset..]),
-        )?;
-
         // Return total number of values processed.
         Ok(values_offset)
     }
@@ -522,18 +530,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
     ) -> Result<usize> {
-        // Check if number of definition levels is the same as number of repetition
-        // levels.
-        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
-            if def.len() != rep.len() {
-                return Err(general_err!(
-                    "Inconsistent length of definition and repetition levels: {} != {}",
-                    def.len(),
-                    rep.len()
-                ));
-            }
-        }
-
         // Process definition levels and determine how many values to write.
         let values_to_write = if self.descr.max_def_level() > 0 {
             let levels = def_levels.ok_or_else(|| {
@@ -569,6 +565,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 )
             })?;
 
+            if !levels.is_empty() && levels[0] != 0 {
+                return Err(general_err!(
+                    "Write must start at a record boundary, got non-zero repetition level of {}",
+                    levels[0]
+                ));
+            }
+
             // Count the occasions where we start a new row
             for &level in levels {
                 self.page_metrics.num_buffered_rows += (level == 0) as u32
@@ -2255,6 +2258,7 @@ mod tests {
         let mut buf: Vec<i16> = Vec::new();
         let rep_levels = if max_rep_level > 0 {
             random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
+            buf[0] = 0; // Must start on record boundary
             Some(&buf[..])
         } else {
             None
diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs
index 142112b7b..3142c8c52 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -19,6 +19,7 @@
 
 use arrow::array::{Int32Array, StringArray};
 use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{Int32Builder, ListBuilder};
 use bytes::Bytes;
 use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
 use parquet::arrow::ArrowWriter;
@@ -502,3 +503,45 @@ fn test_string() {
         },
     });
 }
+
+#[test]
+fn test_list() {
+    let mut list = ListBuilder::new(Int32Builder::new());
+    for _ in 0..200 {
+        let values = list.values();
+        for i in 0..8 {
+            values.append_value(i);
+        }
+        list.append(true);
+    }
+    let array = Arc::new(list.finish()) as _;
+
+    let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_page_row_count_limit(20)
+        .set_write_batch_size(3)
+        .build();
+
+    // Test rows not split across pages
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: (0..10)
+                        .map(|_| Page {
+                            rows: 20,
+                            page_header_size: 34,
+                            compressed_size: 672,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        })
+                        .collect(),
+                    dictionary_page: None,
+                }],
+            }],
+        },
+    });
+}