You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/02 19:33:50 UTC

[arrow-rs] branch master updated: Respect max rowgroup size in Arrow writer (#381)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec3158  Respect max rowgroup size in Arrow writer (#381)
7ec3158 is described below

commit 7ec3158b09d24a63730f7aa63d147fd78e2102a5
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Wed Jun 2 21:31:50 2021 +0200

    Respect max rowgroup size in Arrow writer (#381)
    
    * Respect max rowgroup size in Arrow writer
    
    * simplify while loop
    
    * address review feedback
---
 parquet/src/arrow/arrow_writer.rs | 176 ++++++++++++++++++++++++++--------
 parquet/src/arrow/levels.rs       | 193 +++++++++++++++++++++++++++-----------
 parquet/src/file/properties.rs    |   1 +
 3 files changed, 277 insertions(+), 93 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 332b893..69ebce6 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -47,6 +47,8 @@ pub struct ArrowWriter<W: ParquetWriter> {
     ///
     /// The schema is used to verify that each record batch written has the correct schema
     arrow_schema: SchemaRef,
+    /// The length of arrays to write to each row group
+    max_row_group_size: usize,
 }
 
 impl<W: 'static + ParquetWriter> ArrowWriter<W> {
@@ -65,6 +67,8 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
         let mut props = props.unwrap_or_else(|| WriterProperties::builder().build());
         add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
 
+        let max_row_group_size = props.max_row_group_size();
+
         let file_writer = SerializedFileWriter::new(
             writer.try_clone()?,
             schema.root_schema_ptr(),
@@ -74,12 +78,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
         Ok(Self {
             writer: file_writer,
             arrow_schema,
+            max_row_group_size,
         })
     }
 
     /// Write a RecordBatch to writer
     ///
-    /// *NOTE:* The writer currently does not support all Arrow data types
+    /// The writer will slice the `batch` into `max_row_group_size`,
+    /// but if a batch has left-over rows less than the row group size,
+    /// the last row group will have fewer records.
+    /// This is currently a limitation  because we close the row group
+    /// instead of keeping it open for the next batch.
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         // validate batch schema against writer's supplied schema
         if self.arrow_schema != batch.schema() {
@@ -87,17 +96,31 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
                 "Record batch schema does not match writer schema".to_string(),
             ));
         }
-        // compute the definition and repetition levels of the batch
-        let batch_level = LevelInfo::new_from_batch(batch);
-        let mut row_group_writer = self.writer.next_row_group()?;
-        for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
-            let mut levels = batch_level.calculate_array_levels(array, field);
-            // Reverse levels as we pop() them when writing arrays
-            levels.reverse();
-            write_leaves(&mut row_group_writer, array, &mut levels)?;
+        // Track the number of rows being written in the batch.
+        // We currently do not have a way of slicing nested arrays, thus we
+        // track this manually.
+        let num_rows = batch.num_rows();
+        let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
+        let min_batch = num_rows.min(self.max_row_group_size);
+        for batch_index in 0..batches {
+            // Determine the offset and length of arrays
+            let offset = batch_index * min_batch;
+            let length = (num_rows - offset).min(self.max_row_group_size);
+
+            // Compute the definition and repetition levels of the batch
+            let batch_level = LevelInfo::new(offset, length);
+            let mut row_group_writer = self.writer.next_row_group()?;
+            for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
+                let mut levels = batch_level.calculate_array_levels(array, field);
+                // Reverse levels as we pop() them when writing arrays
+                levels.reverse();
+                write_leaves(&mut row_group_writer, array, &mut levels)?;
+            }
+
+            self.writer.close_row_group(row_group_writer)?;
         }
 
-        self.writer.close_row_group(row_group_writer)
+        Ok(())
     }
 
     /// Close and finalize the underlying Parquet writer
@@ -209,16 +232,19 @@ fn write_leaf(
     levels: LevelInfo,
 ) -> Result<i64> {
     let indices = levels.filter_array_indices();
+    // Slice array according to computed offset and length
+    let column = column.slice(levels.offset, levels.length);
     let written = match writer {
         ColumnWriter::Int32ColumnWriter(ref mut typed) => {
             let values = match column.data_type() {
                 ArrowDataType::Date64 => {
                     // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
                     let array = if let ArrowDataType::Date64 = column.data_type() {
-                        let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
+                        let array =
+                            arrow::compute::cast(&column, &ArrowDataType::Date32)?;
                         arrow::compute::cast(&array, &ArrowDataType::Int32)?
                     } else {
-                        arrow::compute::cast(column, &ArrowDataType::Int32)?
+                        arrow::compute::cast(&column, &ArrowDataType::Int32)?
                     };
                     let array = array
                         .as_any()
@@ -240,7 +266,7 @@ fn write_leaf(
                     get_numeric_array_slice::<Int32Type, _>(&array, &indices)
                 }
                 _ => {
-                    let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+                    let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?;
                     let array = array
                         .as_any()
                         .downcast_ref::<arrow_array::Int32Array>()
@@ -288,7 +314,7 @@ fn write_leaf(
                     get_numeric_array_slice::<Int64Type, _>(&array, &indices)
                 }
                 _ => {
-                    let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
+                    let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?;
                     let array = array
                         .as_any()
                         .downcast_ref::<arrow_array::Int64Array>()
@@ -866,7 +892,17 @@ mod tests {
         )
         .unwrap();
 
-        roundtrip("test_arrow_writer_complex.parquet", batch);
+        roundtrip(
+            "test_arrow_writer_complex.parquet",
+            batch.clone(),
+            Some(SMALL_SIZE / 2),
+        );
+
+        roundtrip(
+            "test_arrow_writer_complex_small_batch.parquet",
+            batch,
+            Some(SMALL_SIZE / 3),
+        );
     }
 
     #[test]
@@ -904,7 +940,11 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
                 .unwrap();
 
-        roundtrip("test_arrow_writer_complex_mixed.parquet", batch);
+        roundtrip(
+            "test_arrow_writer_complex_mixed.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -936,7 +976,11 @@ mod tests {
         // build a racord batch
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
 
-        roundtrip("test_arrow_writer_2_level_struct.parquet", batch);
+        roundtrip(
+            "test_arrow_writer_2_level_struct.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -966,7 +1010,11 @@ mod tests {
         // build a racord batch
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
 
-        roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch);
+        roundtrip(
+            "test_arrow_writer_2_level_struct_non_null.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -998,18 +1046,30 @@ mod tests {
         // build a racord batch
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
 
-        roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch);
+        roundtrip(
+            "test_arrow_writer_2_level_struct_mixed_null.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
-    const SMALL_SIZE: usize = 4;
+    const SMALL_SIZE: usize = 7;
 
-    fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
+    fn roundtrip(
+        filename: &str,
+        expected_batch: RecordBatch,
+        max_row_group_size: Option<usize>,
+    ) -> File {
         let file = get_temp_file(filename, &[]);
 
         let mut writer = ArrowWriter::try_new(
             file.try_clone().unwrap(),
             expected_batch.schema(),
-            None,
+            max_row_group_size.map(|size| {
+                WriterProperties::builder()
+                    .set_max_row_group_size(size)
+                    .build()
+            }),
         )
         .expect("Unable to write file");
         writer.write(&expected_batch).unwrap();
@@ -1037,7 +1097,12 @@ mod tests {
         file
     }
 
-    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
+    fn one_column_roundtrip(
+        filename: &str,
+        values: ArrayRef,
+        nullable: bool,
+        max_row_group_size: Option<usize>,
+    ) -> File {
         let schema = Schema::new(vec![Field::new(
             "col",
             values.data_type().clone(),
@@ -1046,7 +1111,7 @@ mod tests {
         let expected_batch =
             RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
 
-        roundtrip(filename, expected_batch)
+        roundtrip(filename, expected_batch, max_row_group_size)
     }
 
     fn values_required<A, I>(iter: I, filename: &str)
@@ -1056,7 +1121,7 @@ mod tests {
     {
         let raw_values: Vec<_> = iter.into_iter().collect();
         let values = Arc::new(A::from(raw_values));
-        one_column_roundtrip(filename, values, false);
+        one_column_roundtrip(filename, values, false, Some(SMALL_SIZE / 2));
     }
 
     fn values_optional<A, I>(iter: I, filename: &str)
@@ -1070,7 +1135,7 @@ mod tests {
             .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
             .collect();
         let optional_values = Arc::new(A::from(optional_raw_values));
-        one_column_roundtrip(filename, optional_values, true);
+        one_column_roundtrip(filename, optional_values, true, Some(SMALL_SIZE / 2));
     }
 
     fn required_and_optional<A, I>(iter: I, filename: &str)
@@ -1085,12 +1150,17 @@ mod tests {
     #[test]
     fn all_null_primitive_single_column() {
         let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
-        one_column_roundtrip("all_null_primitive_single_column", values, true);
+        one_column_roundtrip(
+            "all_null_primitive_single_column",
+            values,
+            true,
+            Some(SMALL_SIZE / 2),
+        );
     }
     #[test]
     fn null_single_column() {
         let values = Arc::new(NullArray::new(SMALL_SIZE));
-        one_column_roundtrip("null_single_column", values, true);
+        one_column_roundtrip("null_single_column", values, true, Some(SMALL_SIZE / 2));
         // null arrays are always nullable, a test with non-nullable nulls fails
     }
 
@@ -1176,7 +1246,7 @@ mod tests {
         let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
         let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));
 
-        one_column_roundtrip("timestamp_second_single_column", values, false);
+        one_column_roundtrip("timestamp_second_single_column", values, false, Some(3));
     }
 
     #[test]
@@ -1184,7 +1254,12 @@ mod tests {
         let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
         let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));
 
-        one_column_roundtrip("timestamp_millisecond_single_column", values, false);
+        one_column_roundtrip(
+            "timestamp_millisecond_single_column",
+            values,
+            false,
+            Some(SMALL_SIZE / 2 + 1),
+        );
     }
 
     #[test]
@@ -1192,7 +1267,12 @@ mod tests {
         let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
         let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));
 
-        one_column_roundtrip("timestamp_microsecond_single_column", values, false);
+        one_column_roundtrip(
+            "timestamp_microsecond_single_column",
+            values,
+            false,
+            Some(SMALL_SIZE / 2 + 2),
+        );
     }
 
     #[test]
@@ -1200,7 +1280,12 @@ mod tests {
         let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
         let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));
 
-        one_column_roundtrip("timestamp_nanosecond_single_column", values, false);
+        one_column_roundtrip(
+            "timestamp_nanosecond_single_column",
+            values,
+            false,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -1336,7 +1421,12 @@ mod tests {
         builder.append_value(b"1112").unwrap();
         let array = Arc::new(builder.finish());
 
-        one_column_roundtrip("fixed_size_binary_single_column", array, true);
+        one_column_roundtrip(
+            "fixed_size_binary_single_column",
+            array,
+            true,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -1379,7 +1469,7 @@ mod tests {
         let a = ListArray::from(a_list_data);
         let values = Arc::new(a);
 
-        one_column_roundtrip("list_single_column", values, true);
+        one_column_roundtrip("list_single_column", values, true, Some(SMALL_SIZE / 2));
     }
 
     #[test]
@@ -1404,7 +1494,12 @@ mod tests {
         let a = LargeListArray::from(a_list_data);
         let values = Arc::new(a);
 
-        one_column_roundtrip("large_list_single_column", values, true);
+        one_column_roundtrip(
+            "large_list_single_column",
+            values,
+            true,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -1414,7 +1509,7 @@ mod tests {
         let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
 
         let values = Arc::new(s);
-        one_column_roundtrip("struct_single_column", values, false);
+        one_column_roundtrip("struct_single_column", values, false, Some(SMALL_SIZE / 2));
     }
 
     #[test]
@@ -1440,6 +1535,7 @@ mod tests {
         roundtrip(
             "test_arrow_writer_string_dictionary.parquet",
             expected_batch,
+            Some(SMALL_SIZE / 2),
         );
     }
 
@@ -1470,6 +1566,7 @@ mod tests {
         roundtrip(
             "test_arrow_writer_primitive_dictionary.parquet",
             expected_batch,
+            Some(SMALL_SIZE / 2),
         );
     }
 
@@ -1496,6 +1593,7 @@ mod tests {
         roundtrip(
             "test_arrow_writer_string_dictionary_unsigned_index.parquet",
             expected_batch,
+            Some(SMALL_SIZE / 2),
         );
     }
 
@@ -1511,7 +1609,7 @@ mod tests {
             u32::MAX - 1,
             u32::MAX,
         ]));
-        let file = one_column_roundtrip("u32_min_max_single_column", values, false);
+        let file = one_column_roundtrip("u32_min_max_single_column", values, false, None);
 
         // check statistics are valid
         let reader = SerializedFileReader::new(file).unwrap();
@@ -1542,7 +1640,7 @@ mod tests {
             u64::MAX - 1,
             u64::MAX,
         ]));
-        let file = one_column_roundtrip("u64_min_max_single_column", values, false);
+        let file = one_column_roundtrip("u64_min_max_single_column", values, false, None);
 
         // check statistics are valid
         let reader = SerializedFileReader::new(file).unwrap();
@@ -1565,7 +1663,7 @@ mod tests {
     fn statistics_null_counts_only_nulls() {
         // check that null-count statistics for "only NULL"-columns are correct
         let values = Arc::new(UInt64Array::from(vec![None, None]));
-        let file = one_column_roundtrip("null_counts", values, true);
+        let file = one_column_roundtrip("null_counts", values, true, None);
 
         // check statistics are valid
         let reader = SerializedFileReader::new(file).unwrap();
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index be56726..2e95039 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -42,7 +42,6 @@
 
 use arrow::array::{make_array, ArrayRef, StructArray};
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 
 /// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
 ///
@@ -67,6 +66,10 @@ pub(crate) struct LevelInfo {
     pub max_definition: i16,
     /// The type of array represented by this level info
     pub level_type: LevelType,
+    /// The offset of the current level's array
+    pub offset: usize,
+    /// The length of the current level's array
+    pub length: usize,
 }
 
 /// LevelType defines the type of level, and whether it is nullable or not
@@ -91,22 +94,23 @@ impl LevelType {
 }
 
 impl LevelInfo {
-    /// Create a new [LevelInfo] from a record batch.
+    /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
     ///
     /// This is a convenience function to populate the starting point of the traversal.
-    pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self {
-        let num_rows = batch.num_rows();
+    pub(crate) fn new(offset: usize, length: usize) -> Self {
         Self {
             // a batch has no definition level yet
-            definition: vec![0; num_rows],
+            definition: vec![0; length],
             // a batch has no repetition as it is not a list
             repetition: None,
             // a batch has sequential offsets, should be num_rows + 1
-            array_offsets: (0..=(num_rows as i64)).collect(),
+            array_offsets: (0..=(length as i64)).collect(),
             // all values at a batch-level are non-null
-            array_mask: vec![true; num_rows],
+            array_mask: vec![true; length],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset,
+            length,
         }
     }
 
@@ -127,16 +131,19 @@ impl LevelInfo {
         array: &ArrayRef,
         field: &Field,
     ) -> Vec<Self> {
-        let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array);
+        let (array_offsets, array_mask) =
+            Self::get_array_offsets_and_masks(array, self.offset, self.length);
         match array.data_type() {
             DataType::Null => vec![Self {
                 definition: self.definition.clone(),
                 repetition: self.repetition.clone(),
-                array_offsets: self.array_offsets.clone(),
+                array_offsets,
                 array_mask,
                 max_definition: self.max_definition.max(1),
                 // Null type is always nullable
                 level_type: LevelType::Primitive(true),
+                offset: self.offset,
+                length: self.length,
             }],
             DataType::Boolean
             | DataType::Int8
@@ -171,6 +178,8 @@ impl LevelInfo {
                 )]
             }
             DataType::List(list_field) | DataType::LargeList(list_field) => {
+                let child_offset = array_offsets[0] as usize;
+                let child_len = *array_offsets.last().unwrap() as usize;
                 // Calculate the list level
                 let list_level = self.calculate_child_levels(
                     array_offsets,
@@ -182,8 +191,11 @@ impl LevelInfo {
                 let array_data = array.data();
                 let child_data = array_data.child_data().get(0).unwrap();
                 let child_array = make_array(child_data.clone());
-                let (child_offsets, child_mask) =
-                    Self::get_array_offsets_and_masks(&child_array);
+                let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
+                    &child_array,
+                    child_offset,
+                    child_len - child_offset,
+                );
 
                 match child_array.data_type() {
                     // TODO: The behaviour of a <list<null>> is untested
@@ -364,14 +376,15 @@ impl LevelInfo {
 
                     if parent_len == 0 {
                         // If the parent length is 0, there won't be a slot for the child
-                        let index = start + nulls_seen;
+                        let index = start + nulls_seen - self.offset;
                         definition.push(self.definition[index]);
                         repetition.push(0);
                         merged_array_mask.push(self.array_mask[index]);
                         nulls_seen += 1;
                     } else {
                         (start..end).for_each(|parent_index| {
-                            let index = parent_index + nulls_seen;
+                            let index = parent_index + nulls_seen - self.offset;
+                            let parent_index = parent_index - self.offset;
 
                             // parent is either defined at this level, or earlier
                             let parent_def = self.definition[index];
@@ -418,6 +431,9 @@ impl LevelInfo {
 
                 debug_assert_eq!(definition.len(), merged_array_mask.len());
 
+                let offset = *array_offsets.first().unwrap() as usize;
+                let length = *array_offsets.last().unwrap() as usize - offset;
+
                 Self {
                     definition,
                     repetition: Some(repetition),
@@ -425,6 +441,8 @@ impl LevelInfo {
                     array_mask: merged_array_mask,
                     max_definition,
                     level_type,
+                    offset: offset + self.offset,
+                    length,
                 }
             }
             (LevelType::List(_), _) => {
@@ -442,7 +460,7 @@ impl LevelInfo {
                     let parent_len = end - start;
 
                     if parent_len == 0 {
-                        let index = start + nulls_seen;
+                        let index = start + nulls_seen - self.offset;
                         definition.push(self.definition[index]);
                         repetition.push(reps[index]);
                         merged_array_mask.push(self.array_mask[index]);
@@ -450,8 +468,8 @@ impl LevelInfo {
                     } else {
                         // iterate through the array, adjusting child definitions for nulls
                         (start..end).for_each(|child_index| {
-                            let index = child_index + nulls_seen;
-                            let child_mask = array_mask[child_index];
+                            let index = child_index + nulls_seen - self.offset;
+                            let child_mask = array_mask[child_index - self.offset];
                             let parent_mask = self.array_mask[index];
                             let parent_def = self.definition[index];
 
@@ -470,6 +488,9 @@ impl LevelInfo {
 
                 debug_assert_eq!(definition.len(), merged_array_mask.len());
 
+                let offset = *array_offsets.first().unwrap() as usize;
+                let length = *array_offsets.last().unwrap() as usize - offset;
+
                 Self {
                     definition,
                     repetition: Some(repetition),
@@ -477,6 +498,8 @@ impl LevelInfo {
                     array_mask: merged_array_mask,
                     max_definition,
                     level_type,
+                    offset: offset + self.offset,
+                    length,
                 }
             }
             (_, LevelType::List(is_nullable)) => {
@@ -539,6 +562,9 @@ impl LevelInfo {
 
                 debug_assert_eq!(definition.len(), merged_array_mask.len());
 
+                let offset = *array_offsets.first().unwrap() as usize;
+                let length = *array_offsets.last().unwrap() as usize - offset;
+
                 Self {
                     definition,
                     repetition: Some(repetition),
@@ -546,6 +572,8 @@ impl LevelInfo {
                     array_mask: merged_array_mask,
                     max_definition,
                     level_type,
+                    offset,
+                    length,
                 }
             }
             (_, _) => {
@@ -583,6 +611,9 @@ impl LevelInfo {
                     array_mask: merged_array_mask,
                     max_definition,
                     level_type,
+                    // Inherit parent offset and length
+                    offset: self.offset,
+                    length: self.length,
                 }
             }
         }
@@ -592,7 +623,11 @@ impl LevelInfo {
     /// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
     ///   from validity bitmap
     /// - List array offsets will be the value offsets, masks are computed from offsets
-    fn get_array_offsets_and_masks(array: &ArrayRef) -> (Vec<i64>, Vec<bool>) {
+    fn get_array_offsets_and_masks(
+        array: &ArrayRef,
+        offset: usize,
+        len: usize,
+    ) -> (Vec<i64>, Vec<bool>) {
         match array.data_type() {
             DataType::Null
             | DataType::Boolean
@@ -622,10 +657,10 @@ impl LevelInfo {
             | DataType::Dictionary(_, _)
             | DataType::Decimal(_, _) => {
                 let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
-                    None => vec![true; array.len()],
+                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+                    None => vec![true; len],
                 };
-                ((0..=(array.len() as i64)).collect(), array_mask)
+                ((0..=(len as i64)).collect(), array_mask)
             }
             DataType::List(_) => {
                 let data = array.data();
@@ -633,31 +668,37 @@ impl LevelInfo {
                 let offsets = offsets
                     .to_vec()
                     .into_iter()
+                    .skip(offset)
+                    .take(len + 1)
                     .map(|v| v as i64)
                     .collect::<Vec<i64>>();
                 let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
-                    None => vec![true; array.len()],
+                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+                    None => vec![true; len],
                 };
                 (offsets, array_mask)
             }
             DataType::LargeList(_) => {
-                let offsets =
-                    unsafe { array.data().buffers()[0].typed_data::<i64>() }.to_vec();
+                let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
+                    .iter()
+                    .skip(offset)
+                    .take(len + 1)
+                    .copied()
+                    .collect();
                 let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
-                    None => vec![true; array.len()],
+                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+                    None => vec![true; len],
                 };
                 (offsets, array_mask)
             }
             DataType::FixedSizeBinary(value_len) => {
                 let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
-                    None => vec![true; array.len()],
+                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+                    None => vec![true; len],
                 };
                 let value_len = *value_len as i64;
                 (
-                    (0..=(array.len() as i64)).map(|v| v * value_len).collect(),
+                    (0..=(len as i64)).map(|v| v * value_len).collect(),
                     array_mask,
                 )
             }
@@ -722,20 +763,14 @@ fn get_bool_array_slice(
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
+    use super::*;
 
-    use arrow::{
-        array::ListArray,
-        array::{Array, ArrayData, Int32Array},
-        buffer::Buffer,
-        datatypes::Schema,
-    };
-    use arrow::{
-        array::{Float32Array, Float64Array, Int16Array},
-        datatypes::ToByteSlice,
-    };
+    use std::sync::Arc;
 
-    use super::*;
+    use arrow::array::*;
+    use arrow::buffer::Buffer;
+    use arrow::datatypes::{Schema, ToByteSlice};
+    use arrow::record_batch::RecordBatch;
 
     #[test]
     fn test_calculate_array_levels_twitter_example() {
@@ -748,6 +783,8 @@ mod tests {
             array_mask: vec![true, true], // both lists defined
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 0,
+            length: 2,
         };
         // offset into array, each level1 has 2 values
         let array_offsets = vec![0, 2, 4];
@@ -767,6 +804,8 @@ mod tests {
             array_mask: vec![true, true, true, true],
             max_definition: 1,
             level_type: LevelType::List(false),
+            offset: 0,
+            length: 4,
         };
         // the separate asserts make it easier to see what's failing
         assert_eq!(&levels.definition, &expected_levels.definition);
@@ -794,6 +833,8 @@ mod tests {
             array_mask: vec![true; 10],
             max_definition: 2,
             level_type: LevelType::List(false),
+            offset: 0,
+            length: 10,
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -814,6 +855,8 @@ mod tests {
             array_mask: vec![true; 10],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 0,
+            length: 10,
         };
         let array_offsets: Vec<i64> = (0..=10).collect();
         let array_mask = vec![true; 10];
@@ -830,6 +873,8 @@ mod tests {
             array_mask,
             max_definition: 1,
             level_type: LevelType::Primitive(false),
+            offset: 0,
+            length: 10,
         };
         assert_eq!(&levels, &expected_levels);
     }
@@ -844,6 +889,8 @@ mod tests {
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 0,
+            length: 5,
         };
         let array_offsets: Vec<i64> = (0..=5).collect();
         let array_mask = vec![true, false, true, true, false];
@@ -860,6 +907,8 @@ mod tests {
             array_mask,
             max_definition: 1,
             level_type: LevelType::Primitive(true),
+            offset: 0,
+            length: 5,
         };
         assert_eq!(&levels, &expected_levels);
     }
@@ -875,6 +924,8 @@ mod tests {
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 0,
+            length: 5,
         };
         let array_offsets = vec![0, 2, 2, 4, 8, 11];
         let array_mask = vec![true, false, true, true, true];
@@ -905,6 +956,8 @@ mod tests {
             ],
             max_definition: 1,
             level_type: LevelType::List(true),
+            offset: 0,
+            length: 11, // the child has 11 slots
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -936,6 +989,8 @@ mod tests {
             array_mask: vec![false, true, false, true, true],
             max_definition: 1,
             level_type: LevelType::Struct(true),
+            offset: 0,
+            length: 5,
         };
         let array_offsets = vec![0, 2, 2, 4, 8, 11];
         let array_mask = vec![true, false, true, true, true];
@@ -960,6 +1015,8 @@ mod tests {
             ],
             max_definition: 2,
             level_type: LevelType::List(true),
+            offset: 0,
+            length: 11,
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1017,6 +1074,8 @@ mod tests {
             ],
             max_definition: 4,
             level_type: LevelType::List(true),
+            offset: 0,
+            length: 22,
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1042,6 +1101,8 @@ mod tests {
             array_mask: vec![true, true, true, true],
             max_definition: 1,
             level_type: LevelType::Struct(true),
+            offset: 0,
+            length: 4,
         };
         // 0: null ([], but mask is false, so it's not just an empty list)
         // 1: [1, 2, 3]
@@ -1066,6 +1127,8 @@ mod tests {
             array_mask: vec![false, true, true, true, true, true, true, true],
             max_definition: 2,
             level_type: LevelType::List(true),
+            offset: 0,
+            length: 8,
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1113,6 +1176,8 @@ mod tests {
             array_offsets,
             max_definition: 4,
             level_type: LevelType::List(true),
+            offset: 0,
+            length: 16,
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1140,6 +1205,8 @@ mod tests {
             array_mask: vec![true, true, true, true, false, true],
             max_definition: 1,
             level_type: LevelType::Struct(true),
+            offset: 0,
+            length: 6,
         };
         // b's offset and mask
         let b_offsets: Vec<i64> = (0..=6).collect();
@@ -1152,6 +1219,8 @@ mod tests {
             array_mask: vec![true, true, true, false, false, true],
             max_definition: 2,
             level_type: LevelType::Struct(true),
+            offset: 0,
+            length: 6,
         };
         let b_levels = a_levels.calculate_child_levels(
             b_offsets.clone(),
@@ -1171,6 +1240,8 @@ mod tests {
             array_mask: vec![true, false, true, false, false, true],
             max_definition: 3,
             level_type: LevelType::Struct(true),
+            offset: 0,
+            length: 6,
         };
         let c_levels =
             b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
@@ -1203,15 +1274,17 @@ mod tests {
         let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
 
         let expected_batch_level = LevelInfo {
-            definition: vec![0; 5],
+            definition: vec![0; 2],
             repetition: None,
-            array_offsets: (0..=5).collect(),
-            array_mask: vec![true, true, true, true, true],
+            array_offsets: (0..=2).collect(),
+            array_mask: vec![true, true],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 2,
+            length: 2,
         };
 
-        let batch_level = LevelInfo::new_from_batch(&batch);
+        let batch_level = LevelInfo::new(2, 2);
         assert_eq!(&batch_level, &expected_batch_level);
 
         // calculate the list's level
@@ -1229,14 +1302,14 @@ mod tests {
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![3, 3, 3, 0, 3, 3, 3, 3, 3, 3, 3],
-            repetition: Some(vec![0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1]),
-            array_offsets: vec![0, 1, 3, 3, 6, 10],
-            array_mask: vec![
-                true, true, true, false, true, true, true, true, true, true, true,
-            ],
+            definition: vec![0, 3, 3, 3],
+            repetition: Some(vec![0, 0, 1, 1]),
+            array_offsets: vec![3, 3, 6],
+            array_mask: vec![false, true, true, true],
             max_definition: 3,
             level_type: LevelType::Primitive(true),
+            offset: 3,
+            length: 3,
         };
         assert_eq!(&list_level.definition, &expected_level.definition);
         assert_eq!(&list_level.repetition, &expected_level.repetition);
@@ -1320,9 +1393,11 @@ mod tests {
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
             level_type: LevelType::Root,
+            offset: 0,
+            length: 5,
         };
 
-        let batch_level = LevelInfo::new_from_batch(&batch);
+        let batch_level = LevelInfo::new(0, 5);
         assert_eq!(&batch_level, &expected_batch_level);
 
         // calculate the list's level
@@ -1347,6 +1422,8 @@ mod tests {
             array_mask: vec![true, true, true, true, true],
             max_definition: 1,
             level_type: LevelType::Primitive(false),
+            offset: 0,
+            length: 5,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1360,6 +1437,8 @@ mod tests {
             array_mask: vec![true, false, false, true, true],
             max_definition: 1,
             level_type: LevelType::Primitive(true),
+            offset: 0,
+            length: 5,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1373,6 +1452,8 @@ mod tests {
             array_mask: vec![false, false, false, true, false],
             max_definition: 2,
             level_type: LevelType::Primitive(true),
+            offset: 0,
+            length: 5,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1386,6 +1467,8 @@ mod tests {
             array_mask: vec![true, false, true, false, true],
             max_definition: 3,
             level_type: LevelType::Primitive(true),
+            offset: 0,
+            length: 5,
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1399,6 +1482,8 @@ mod tests {
             array_mask: vec![true, true, true, false, true, true, true],
             max_definition: 3,
             level_type: LevelType::Primitive(true),
+            offset: 0,
+            length: 6,
         };
 
         let expected = vec![0, 1, 2, 3, 4, 5];
@@ -1427,7 +1512,7 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
                 .unwrap();
 
-        let batch_level = LevelInfo::new_from_batch(&batch);
+        let batch_level = LevelInfo::new(0, batch.num_rows());
         let struct_null_level =
             batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
 
@@ -1451,7 +1536,7 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
                 .unwrap();
 
-        let batch_level = LevelInfo::new_from_batch(&batch);
+        let batch_level = LevelInfo::new(0, batch.num_rows());
         let struct_non_null_level =
             batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
 
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index b0b25f9..0d0cbef 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -290,6 +290,7 @@ impl WriterPropertiesBuilder {
 
     /// Sets max size for a row group.
     pub fn set_max_row_group_size(mut self, value: usize) -> Self {
+        assert!(value > 0, "Cannot have a 0 max row group size");
         self.max_row_group_size = value;
         self
     }