You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/01 14:28:22 UTC

[arrow-rs] branch master updated: Batch multiple records in ArrowWriter (#1214)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 891b8d0  Batch multiple records in ArrowWriter (#1214)
891b8d0 is described below

commit 891b8d0dbdcf970863bf4b337c10d41274a1ecdd
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Feb 1 14:28:15 2022 +0000

    Batch multiple records in ArrowWriter (#1214)
    
    * Batch multiple records in ArrowWriter
    
    * Document max_group_size and reduce default (#1213)
    
    * Review feedback
    
    * Write multiple arrays without concat
    
    * Clippy
    
    * Test aggregating complex types
    
    * Test complex slice
    
    * Clippy
---
 arrow/src/util/pretty.rs          |  10 +-
 parquet/Cargo.toml                |   2 +-
 parquet/src/arrow/arrow_writer.rs | 434 ++++++++++++++++++++++++++++++++------
 parquet/src/arrow/levels.rs       |   9 +-
 parquet/src/file/properties.rs    |   6 +-
 5 files changed, 380 insertions(+), 81 deletions(-)

diff --git a/arrow/src/util/pretty.rs b/arrow/src/util/pretty.rs
index 91343ec..4b67f3d 100644
--- a/arrow/src/util/pretty.rs
+++ b/arrow/src/util/pretty.rs
@@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
             let mut cells = Vec::new();
             for col in 0..batch.num_columns() {
                 let column = batch.column(col);
-                cells.push(Cell::new(&array_value_to_string(&column, row)?));
+                cells.push(Cell::new(&array_value_to_string(column, row)?));
             }
             table.add_row(cells);
         }
@@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result<Table> {
 
     for col in columns {
         for row in 0..col.len() {
-            let cells = vec![Cell::new(&array_value_to_string(&col, row)?)];
+            let cells = vec![Cell::new(&array_value_to_string(col, row)?)];
             table.add_row(cells);
         }
     }
@@ -320,7 +320,7 @@ mod tests {
         let mut builder = FixedSizeBinaryBuilder::new(3, 3);
 
         builder.append_value(&[1, 2, 3]).unwrap();
-        builder.append_null();
+        builder.append_null().unwrap();
         builder.append_value(&[7, 8, 9]).unwrap();
 
         let array = Arc::new(builder.finish());
@@ -677,7 +677,7 @@ mod tests {
         )?;
 
         let mut buf = String::new();
-        write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();
+        write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap();
 
         let s = vec![
             "+---+-----+",
@@ -689,7 +689,7 @@ mod tests {
             "| d | 100 |",
             "+---+-----+",
         ];
-        let expected = String::from(s.join("\n"));
+        let expected = s.join("\n");
         assert_eq!(expected, buf);
 
         Ok(())
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 0765da1..3dd65f6 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -55,7 +55,7 @@ brotli = "3.3"
 flate2 = "1.0"
 lz4 = "1.23"
 serde_json = { version = "1.0", features = ["preserve_order"] }
-arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
+arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }
 
 [features]
 default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index fc3a567..c0205c4 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -17,9 +17,11 @@
 
 //! Contains writer which writes arrow data into parquet data.
 
+use std::collections::VecDeque;
 use std::sync::Arc;
 
 use arrow::array as arrow_array;
+use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_array::Array;
@@ -40,14 +42,25 @@ use crate::{
 
 /// Arrow writer
 ///
-/// Writes Arrow `RecordBatch`es to a Parquet writer
+/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
+/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be
+/// flushed on close, leading the final row group in the output file to potentially
+/// contain fewer than `max_row_group_size` rows
 pub struct ArrowWriter<W: ParquetWriter> {
     /// Underlying Parquet writer
     writer: SerializedFileWriter<W>,
+
+    /// For each column, maintain an ordered queue of arrays to write
+    buffer: Vec<VecDeque<ArrayRef>>,
+
+    /// The total number of rows currently buffered
+    buffered_rows: usize,
+
     /// A copy of the Arrow schema.
     ///
     /// The schema is used to verify that each record batch written has the correct schema
     arrow_schema: SchemaRef,
+
     /// The length of arrays to write to each row group
     max_row_group_size: usize,
 }
@@ -75,18 +88,18 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
 
         Ok(Self {
             writer: file_writer,
+            buffer: vec![Default::default(); arrow_schema.fields().len()],
+            buffered_rows: 0,
             arrow_schema,
             max_row_group_size,
         })
     }
 
-    /// Write a RecordBatch to writer
+    /// Enqueues the provided `RecordBatch` to be written
     ///
-    /// The writer will slice the `batch` into `max_row_group_size`,
-    /// but if a batch has left-over rows less than the row group size,
-    /// the last row group will have fewer records.
-    /// This is currently a limitation  because we close the row group
-    /// instead of keeping it open for the next batch.
+    /// If following this there are more than `max_row_group_size` rows buffered,
+    /// this will flush out one or more row groups with `max_row_group_size` rows,
+    /// and drop any fully written `RecordBatch`
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         // validate batch schema against writer's supplied schema
         if self.arrow_schema != batch.schema() {
@@ -94,58 +107,118 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
                 "Record batch schema does not match writer schema".to_string(),
             ));
         }
-        // Track the number of rows being written in the batch.
-        // We currently do not have a way of slicing nested arrays, thus we
-        // track this manually.
-        let num_rows = batch.num_rows();
-        let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
-        let min_batch = num_rows.min(self.max_row_group_size);
-        for batch_index in 0..batches {
-            // Determine the offset and length of arrays
-            let offset = batch_index * min_batch;
-            let length = (num_rows - offset).min(self.max_row_group_size);
-
-            // Compute the definition and repetition levels of the batch
-            let batch_level = LevelInfo::new(offset, length);
-            let mut row_group_writer = self.writer.next_row_group()?;
-            for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
-                let mut levels = batch_level.calculate_array_levels(array, field);
-                // Reverse levels as we pop() them when writing arrays
-                levels.reverse();
-                write_leaves(&mut row_group_writer, array, &mut levels)?;
+
+        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
+            buffer.push_back(column.clone())
+        }
+
+        self.buffered_rows += batch.num_rows();
+
+        self.flush_completed()?;
+
+        Ok(())
+    }
+
+    /// Flushes buffered data until there are less than `max_row_group_size` rows buffered
+    fn flush_completed(&mut self) -> Result<()> {
+        while self.buffered_rows >= self.max_row_group_size {
+            self.flush_row_group(self.max_row_group_size)?;
+        }
+        Ok(())
+    }
+
+    /// Flushes `num_rows` from the buffer into a new row group
+    fn flush_row_group(&mut self, num_rows: usize) -> Result<()> {
+        if num_rows == 0 {
+            return Ok(());
+        }
+
+        assert!(
+            num_rows <= self.buffered_rows,
+            "cannot flush {} rows only have {}",
+            num_rows,
+            self.buffered_rows
+        );
+
+        assert!(
+            num_rows <= self.max_row_group_size,
+            "cannot flush {} rows would exceed max row group size of {}",
+            num_rows,
+            self.max_row_group_size
+        );
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields())
+        {
+            // Collect the number of arrays to append
+            let mut remaining = num_rows;
+            let mut arrays = Vec::with_capacity(col_buffer.len());
+            while remaining != 0 {
+                match col_buffer.pop_front() {
+                    Some(next) if next.len() > remaining => {
+                        col_buffer
+                            .push_front(next.slice(remaining, next.len() - remaining));
+                        arrays.push(next.slice(0, remaining));
+                        remaining = 0;
+                    }
+                    Some(next) => {
+                        remaining -= next.len();
+                        arrays.push(next);
+                    }
+                    _ => break,
+                }
             }
 
-            self.writer.close_row_group(row_group_writer)?;
+            let mut levels: Vec<_> = arrays
+                .iter()
+                .map(|array| {
+                    let batch_level = LevelInfo::new(0, array.len());
+                    let mut levels = batch_level.calculate_array_levels(array, field);
+                    // Reverse levels as we pop() them when writing arrays
+                    levels.reverse();
+                    levels
+                })
+                .collect();
+
+            write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?;
         }
 
+        self.writer.close_row_group(row_group_writer)?;
+        self.buffered_rows -= num_rows;
+
         Ok(())
     }
 
     /// Close and finalize the underlying Parquet writer
     pub fn close(&mut self) -> Result<parquet_format::FileMetaData> {
+        self.flush_completed()?;
+        self.flush_row_group(self.buffered_rows)?;
         self.writer.close()
     }
 }
 
 /// Convenience method to get the next ColumnWriter from the RowGroupWriter
 #[inline]
-#[allow(clippy::borrowed_box)]
-fn get_col_writer(
-    row_group_writer: &mut Box<dyn RowGroupWriter>,
-) -> Result<ColumnWriter> {
+fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result<ColumnWriter> {
     let col_writer = row_group_writer
         .next_column()?
         .expect("Unable to get column writer");
     Ok(col_writer)
 }
 
-#[allow(clippy::borrowed_box)]
 fn write_leaves(
-    row_group_writer: &mut Box<dyn RowGroupWriter>,
-    array: &arrow_array::ArrayRef,
-    levels: &mut Vec<LevelInfo>,
+    row_group_writer: &mut dyn RowGroupWriter,
+    arrays: &[ArrayRef],
+    levels: &mut [Vec<LevelInfo>],
 ) -> Result<()> {
-    match array.data_type() {
+    assert_eq!(arrays.len(), levels.len());
+    assert!(!arrays.is_empty());
+
+    let data_type = arrays.first().unwrap().data_type().clone();
+    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
+
+    match &data_type {
         ArrowDataType::Null
         | ArrowDataType::Boolean
         | ArrowDataType::Int8
@@ -172,50 +245,76 @@ fn write_leaves(
         | ArrowDataType::Decimal(_, _)
         | ArrowDataType::FixedSizeBinary(_) => {
             let mut col_writer = get_col_writer(row_group_writer)?;
-            write_leaf(
-                &mut col_writer,
-                array,
-                levels.pop().expect("Levels exhausted"),
-            )?;
+            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
+                write_leaf(
+                    &mut col_writer,
+                    array,
+                    levels.pop().expect("Levels exhausted"),
+                )?;
+            }
             row_group_writer.close_column(col_writer)?;
             Ok(())
         }
         ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
-            // write the child list
-            let data = array.data();
-            let child_array = arrow_array::make_array(data.child_data()[0].clone());
-            write_leaves(row_group_writer, &child_array, levels)?;
+            let arrays: Vec<_> = arrays.iter().map(|array|{
+                // write the child list
+                let data = array.data();
+                arrow_array::make_array(data.child_data()[0].clone())
+            }).collect();
+
+            write_leaves(row_group_writer, &arrays, levels)?;
             Ok(())
         }
-        ArrowDataType::Struct(_) => {
-            let struct_array: &arrow_array::StructArray = array
-                .as_any()
-                .downcast_ref::<arrow_array::StructArray>()
-                .expect("Unable to get struct array");
-            for field in struct_array.columns() {
-                write_leaves(row_group_writer, field, levels)?;
+        ArrowDataType::Struct(fields) => {
+            // Groups child arrays by field
+            let mut field_arrays = vec![Vec::with_capacity(arrays.len()); fields.len()];
+
+            for array in arrays {
+                let struct_array: &arrow_array::StructArray = array
+                    .as_any()
+                    .downcast_ref::<arrow_array::StructArray>()
+                    .expect("Unable to get struct array");
+
+                assert_eq!(struct_array.columns().len(), fields.len());
+
+                for (child_array, field) in field_arrays.iter_mut().zip(struct_array.columns()) {
+                    child_array.push(field.clone())
+                }
+            }
+
+            for field in field_arrays {
+                write_leaves(row_group_writer, &field, levels)?;
             }
+
             Ok(())
         }
         ArrowDataType::Map(_, _) => {
-            let map_array: &arrow_array::MapArray = array
-                .as_any()
-                .downcast_ref::<arrow_array::MapArray>()
-                .expect("Unable to get map array");
-            write_leaves(row_group_writer, &map_array.keys(), levels)?;
-            write_leaves(row_group_writer, &map_array.values(), levels)?;
+            let mut keys = Vec::with_capacity(arrays.len());
+            let mut values = Vec::with_capacity(arrays.len());
+            for array in arrays {
+                let map_array: &arrow_array::MapArray = array
+                    .as_any()
+                    .downcast_ref::<arrow_array::MapArray>()
+                    .expect("Unable to get map array");
+                keys.push(map_array.keys());
+                values.push(map_array.values());
+            }
+
+            write_leaves(row_group_writer, &keys, levels)?;
+            write_leaves(row_group_writer, &values, levels)?;
             Ok(())
         }
         ArrowDataType::Dictionary(_, value_type) => {
-            // cast dictionary to a primitive
-            let array = arrow::compute::cast(array, value_type)?;
-
             let mut col_writer = get_col_writer(row_group_writer)?;
-            write_leaf(
-                &mut col_writer,
-                &array,
-                levels.pop().expect("Levels exhausted"),
-            )?;
+            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
+                // cast dictionary to a primitive
+                let array = arrow::compute::cast(array, value_type)?;
+                write_leaf(
+                    &mut col_writer,
+                    &array,
+                    levels.pop().expect("Levels exhausted"),
+                )?;
+            }
             row_group_writer.close_column(col_writer)?;
             Ok(())
         }
@@ -226,7 +325,7 @@ fn write_leaves(
             Err(ParquetError::NYI(
                 format!(
                     "Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
-                    array.data_type()
+                    data_type
                 )
             ))
         }
@@ -593,10 +692,13 @@ mod tests {
 
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
+    use arrow::error::Result as ArrowResult;
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_batches;
     use arrow::{array::*, buffer::Buffer};
 
     use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use crate::file::metadata::ParquetMetaData;
     use crate::file::{
         reader::{FileReader, SerializedFileReader},
         statistics::Statistics,
@@ -1723,4 +1825,202 @@ mod tests {
 
         one_column_roundtrip(array, true, Some(10));
     }
+
+    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
+        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
+    }
+
+    #[test]
+    fn test_aggregates_records() {
+        let arrays = [
+            Int32Array::from((0..100).collect::<Vec<_>>()),
+            Int32Array::from((0..50).collect::<Vec<_>>()),
+            Int32Array::from((200..500).collect::<Vec<_>>()),
+        ];
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "int",
+            ArrowDataType::Int32,
+            false,
+        )]));
+
+        let file = tempfile::tempfile().unwrap();
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(200)
+            .build();
+
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props))
+                .unwrap();
+
+        for array in arrays {
+            let batch =
+                RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
+            writer.write(&batch).unwrap();
+        }
+
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]);
+
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+        let batches = arrow_reader
+            .get_record_reader(100)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(batches.len(), 5);
+        assert!(batches.iter().all(|x| x.num_columns() == 1));
+
+        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+
+        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
+
+        let values: Vec<_> = batches
+            .iter()
+            .flat_map(|x| {
+                x.column(0)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .cloned()
+            })
+            .collect();
+
+        let expected_values: Vec<_> =
+            [0..100, 0..50, 200..500].into_iter().flatten().collect();
+        assert_eq!(&values, &expected_values)
+    }
+
+    #[test]
+    fn complex_aggregate() {
+        // Tests aggregating nested data
+        let field_a = Field::new("leaf_a", DataType::Int32, false);
+        let field_b = Field::new("leaf_b", DataType::Int32, true);
+        let struct_a = Field::new(
+            "struct_a",
+            DataType::Struct(vec![field_a.clone(), field_b.clone()]),
+            true,
+        );
+
+        let list_a = Field::new("list", DataType::List(Box::new(struct_a)), true);
+        let struct_b =
+            Field::new("struct_b", DataType::Struct(vec![list_a.clone()]), false);
+
+        let schema = Arc::new(Schema::new(vec![struct_b]));
+
+        // create nested data
+        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
+        let field_b_array =
+            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
+
+        let struct_a_array = StructArray::from(vec![
+            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
+            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
+        ]);
+
+        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
+            .len(5)
+            .add_buffer(Buffer::from_iter(vec![
+                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
+            ]))
+            .null_bit_buffer(Buffer::from_iter(vec![true, false, true, false, true]))
+            .child_data(vec![struct_a_array.data().clone()])
+            .build()
+            .unwrap();
+
+        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
+        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
+
+        let batch1 = RecordBatch::try_from_iter(vec![(
+            "struct_b",
+            Arc::new(struct_b_array) as ArrayRef,
+        )])
+        .unwrap();
+
+        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
+        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
+
+        let struct_a_array = StructArray::from(vec![
+            (field_a, Arc::new(field_a_array) as ArrayRef),
+            (field_b, Arc::new(field_b_array) as ArrayRef),
+        ]);
+
+        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
+            .len(2)
+            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
+            .child_data(vec![struct_a_array.data().clone()])
+            .build()
+            .unwrap();
+
+        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
+        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
+
+        let batch2 = RecordBatch::try_from_iter(vec![(
+            "struct_b",
+            Arc::new(struct_b_array) as ArrayRef,
+        )])
+        .unwrap();
+
+        let batches = &[batch1, batch2];
+
+        // Verify data is as expected
+
+        let expected = r#"
+            +-------------------------------------------------------------------------------------------------------------------------------------+
+            | struct_b                                                                                                                            |
+            +-------------------------------------------------------------------------------------------------------------------------------------+
+            | {"list": [{"leaf_a": 1, "leaf_b": 1}]}                                                                                              |
+            | {"list": null}                                                                                                                      |
+            | {"list": [{"leaf_a": 2, "leaf_b": null}, {"leaf_a": 3, "leaf_b": 2}]}                                                               |
+            | {"list": null}                                                                                                                      |
+            | {"list": [{"leaf_a": 4, "leaf_b": null}, {"leaf_a": 5, "leaf_b": null}]}                                                            |
+            | {"list": [{"leaf_a": 6, "leaf_b": null}, {"leaf_a": 7, "leaf_b": null}, {"leaf_a": 8, "leaf_b": null}, {"leaf_a": 9, "leaf_b": 1}]} |
+            | {"list": [{"leaf_a": 10, "leaf_b": null}]}                                                                                          |
+            +-------------------------------------------------------------------------------------------------------------------------------------+
+        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
+
+        let actual = pretty_format_batches(batches).unwrap().to_string();
+        assert_eq!(actual, expected);
+
+        // Write data
+        let file = tempfile::tempfile().unwrap();
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(6)
+            .build();
+
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
+
+        for batch in batches {
+            writer.write(batch).unwrap();
+        }
+        writer.close().unwrap();
+
+        // Read Data
+        let reader = SerializedFileReader::new(file).unwrap();
+
+        // Should have written entire first batch and first row of second to the first row group
+        // leaving a single row in the second row group
+        assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]);
+
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+        let batches = arrow_reader
+            .get_record_reader(2)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(batches.len(), 4);
+        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
+
+        let actual = pretty_format_batches(&batches).unwrap().to_string();
+        assert_eq!(actual, expected);
+    }
 }
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index ea1f212..20b0ff7 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -40,7 +40,7 @@
 //!
 //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
 
-use arrow::array::{make_array, ArrayRef, MapArray, StructArray};
+use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
 use arrow::datatypes::{DataType, Field};
 
 /// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
@@ -711,12 +711,11 @@ impl LevelInfo {
                 ((0..=(len as i64)).collect(), array_mask)
             }
             DataType::List(_) | DataType::Map(_, _) => {
-                let data = array.data();
-                let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
+                let offsets = unsafe { array.data().buffers()[0].typed_data::<i32>() };
                 let offsets = offsets
                     .to_vec()
                     .into_iter()
-                    .skip(offset)
+                    .skip(array.offset() + offset)
                     .take(len + 1)
                     .map(|v| v as i64)
                     .collect::<Vec<i64>>();
@@ -729,7 +728,7 @@ impl LevelInfo {
             DataType::LargeList(_) => {
                 let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
                     .iter()
-                    .skip(offset)
+                    .skip(array.offset() + offset)
                     .take(len + 1)
                     .copied()
                     .collect();
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c48e4e7..b42516e 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -62,7 +62,7 @@ const DEFAULT_DICTIONARY_ENABLED: bool = true;
 const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
 const DEFAULT_STATISTICS_ENABLED: bool = true;
 const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
-const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 128 * 1024 * 1024;
+const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
 
 /// Parquet writer version.
@@ -129,7 +129,7 @@ impl WriterProperties {
         self.write_batch_size
     }
 
-    /// Returns max size for a row group.
+    /// Returns maximum number of rows in a row group.
     pub fn max_row_group_size(&self) -> usize {
         self.max_row_group_size
     }
@@ -288,7 +288,7 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    /// Sets max size for a row group.
+    /// Sets maximum number of rows in a row group.
     pub fn set_max_row_group_size(mut self, value: usize) -> Self {
         assert!(value > 0, "Cannot have a 0 max row group size");
         self.max_row_group_size = value;