You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/26 19:41:50 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #1746: Support writing nested lists to parquet

alamb commented on code in PR #1746:
URL: https://github.com/apache/arrow-rs/pull/1746#discussion_r882977357


##########
parquet/src/arrow/arrow_writer.rs:
##########
@@ -341,26 +341,24 @@ fn write_leaf(
     column: &ArrayRef,
     levels: LevelInfo,
 ) -> Result<i64> {
-    let indices = levels.filter_array_indices();
-    // Slice array according to computed offset and length
-    let column = column.slice(levels.offset, levels.length);
+    // TODO: Avoid filtering if no need

Review Comment:
   Should this be a follow on ticket to track? It doesn't seem like this PR is any worse in the delpartment



##########
parquet/src/arrow/levels.rs:
##########
@@ -40,114 +40,32 @@
 //!
 //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
 
-use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
+use crate::errors::{ParquetError, Result};
+use arrow::array::{
+    make_array, Array, ArrayData, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait,
+    StructArray,
+};
 use arrow::datatypes::{DataType, Field};
-
-/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
-///
-/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track
-/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo]
-/// is created, and this is what is used to index into the array when writing data to Parquet.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub(crate) struct LevelInfo {
-    /// Array's definition levels
-    pub definition: Vec<i16>,
-    /// Array's optional repetition levels
-    pub repetition: Option<Vec<i16>>,
-    /// Array's offsets, 64-bit is used to accommodate large offset arrays
-    pub array_offsets: Vec<i64>,
-    // TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
-    /// Array's logical validity mask, whcih gets unpacked for list children.
-    /// If the parent of an array is null, all children are logically treated as
-    /// null. This mask keeps track of that.
-    ///
-    pub array_mask: Vec<bool>,
-    /// The maximum definition at this level, 0 at the record batch
-    pub max_definition: i16,
-    /// The type of array represented by this level info
-    pub level_type: LevelType,
-    /// The offset of the current level's array
-    pub offset: usize,
-    /// The length of the current level's array
-    pub length: usize,
-}
-
-/// LevelType defines the type of level, and whether it is nullable or not
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(crate) enum LevelType {
-    Root,
-    List(bool),
-    Struct(bool),
-    Primitive(bool),
-}
-
-impl LevelType {
-    #[inline]
-    const fn level_increment(&self) -> i16 {
-        match self {
-            LevelType::Root => 0,
-            // List repetition adds a constant 1
-            LevelType::List(is_nullable) => 1 + *is_nullable as i16,
-            LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
-                *is_nullable as i16
-            }
-        }
-    }
+use std::ops::Range;
+
+/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`]
+/// for each leaf column encountered
+pub(crate) fn calculate_array_levels(
+    array: &ArrayRef,
+    field: &Field,
+) -> Result<Vec<LevelInfo>> {
+    let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
+    builder.write(array, 0..array.len());
+    Ok(builder.finish())
 }
 
-impl LevelInfo {
-    /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
-    ///
-    /// This is a convenience function to populate the starting point of the traversal.
-    pub(crate) fn new(offset: usize, length: usize) -> Self {
-        Self {
-            // a batch has no definition level yet
-            definition: vec![0; length],
-            // a batch has no repetition as it is not a list
-            repetition: None,
-            // a batch has sequential offsets, should be num_rows + 1
-            array_offsets: (0..=(length as i64)).collect(),
-            // all values at a batch-level are non-null
-            array_mask: vec![true; length],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset,
-            length,
-        }
-    }
-
-    /// Compute nested levels of the Arrow array, recursing into lists and structs.
-    ///
-    /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
-    ///
-    /// The parent struct's nullness is tracked, as it determines whether the child
-    /// max_definition should be incremented.
-    /// The 'is_parent_struct' variable asks "is this field's parent a struct?".
-    /// * If we are starting at a [RecordBatch](arrow::record_batch::RecordBatch), this is `false`.
-    /// * If we are calculating a list's child, this is `false`.
-    /// * If we are calculating a struct (i.e. `field.data_type90 == Struct`),
-    /// this depends on whether the struct is a child of a struct.
-    /// * If we are calculating a field inside a [StructArray], this is 'true'.
-    pub(crate) fn calculate_array_levels(
-        &self,
-        array: &ArrayRef,
-        field: &Field,
-    ) -> Vec<Self> {
-        let (array_offsets, array_mask) =
-            Self::get_array_offsets_and_masks(array, self.offset, self.length);
-        match array.data_type() {
-            DataType::Null => vec![Self {
-                definition: self.definition.clone(),
-                repetition: self.repetition.clone(),
-                array_offsets,
-                array_mask,
-                max_definition: self.max_definition.max(1),
-                // Null type is always nullable
-                level_type: LevelType::Primitive(true),
-                offset: self.offset,
-                length: self.length,
-            }],
-            DataType::Boolean
+/// Returns true if the DataType can be represented as a primitive parquet column,
+/// i.e. a leaf array with no children
+fn is_leaf(data_type: &DataType) -> bool {
+    matches!(
+        data_type,

Review Comment:
   I wonder if this would be more future proof if it were `!matches!(DataType::Dictionary....`) -- aka explicitly list out the types that are handled in the rest of this module rather than trying to enumerate the converse. 
   
   I am just thinking about what would happen if someone added a new non leaf type to arrow's DataType 



##########
parquet/src/arrow/levels.rs:
##########
@@ -825,495 +483,168 @@ mod tests {
 
     use arrow::array::*;
     use arrow::buffer::Buffer;
-    use arrow::datatypes::{Schema, ToByteSlice};
+    use arrow::datatypes::{Int32Type, Schema, ToByteSlice};
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_columns;
 
     #[test]
     fn test_calculate_array_levels_twitter_example() {
         // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
         // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
-        let parent_levels = LevelInfo {
-            definition: vec![0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
-            array_mask: vec![true, true], // both lists defined
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 2,
-        };
-        // offset into array, each level1 has 2 values
-        let array_offsets = vec![0, 2, 4];
-        let array_mask = vec![true, true];
-
-        // calculate level1 levels
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        //
-        let expected_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1],
-            repetition: Some(vec![0, 1, 0, 1]),
-            array_offsets,
-            array_mask: vec![true, true, true, true],
-            max_definition: 1,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 4,
-        };
-        // the separate asserts make it easier to see what's failing
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        // this assert is to help if there are more variables added to the struct
-        assert_eq!(&levels, &expected_levels);
-
-        // level2
-        let parent_levels = levels;
-        let array_offsets = vec![0, 3, 7, 8, 10];
-        let array_mask = vec![true, true, true, true];
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        let expected_levels = LevelInfo {
-            definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
-            array_offsets,
-            array_mask: vec![true; 10],
-            max_definition: 2,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 10,
+
+        let leaf_type = Field::new("item", DataType::Int32, false);
+        let inner_type = DataType::List(Box::new(leaf_type));
+        let inner_field = Field::new("l2", inner_type.clone(), false);
+        let outer_type = DataType::List(Box::new(inner_field));
+        let outer_field = Field::new("l1", outer_type.clone(), false);
+
+        let primitives = Int32Array::from_iter(0..10);
+
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
+        let inner_list = ArrayDataBuilder::new(inner_type)
+            .len(4)
+            .add_buffer(offsets)
+            .add_child_data(primitives.data().clone())
+            .build()
+            .unwrap();
+
+        let offsets = Buffer::from_iter([0_i32, 2, 4]);
+        let outer_list = ArrayDataBuilder::new(outer_type)
+            .len(2)
+            .add_buffer(offsets)
+            .add_child_data(inner_list)
+            .build()
+            .unwrap();
+        let outer_list = make_array(outer_list);
+
+        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected = LevelInfo {
+            def_levels: Some(vec![2; 10]),
+            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
+            max_def_level: 2,
+            max_rep_level: 2,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected);
     }
 
     #[test]
     fn test_calculate_one_level_1() {
         // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets: (0..=10).collect(),
-            array_mask: vec![true; 10],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 10,
-        };
-        let array_offsets: Vec<i64> = (0..=10).collect();
-        let array_mask = vec![true; 10];
+        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, false);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(false),
-        );
         let expected_levels = LevelInfo {
-            // As it is non-null, definitions can be omitted
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 0,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 10,
+            def_levels: None,
+            rep_levels: None,
+            non_null_indices: (0..10).collect(),
+            max_def_level: 0,
+            max_rep_level: 0,
         };
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_one_level_2() {
-        // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: (0..=5).collect(),
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
-        };
-        let array_offsets: Vec<i64> = (0..=5).collect();
-        let array_mask = vec![true, false, true, true, false];
+        // This test calculates the levels for a nullable primitive array
+        let array = Arc::new(Int32Array::from_iter([
+            Some(0),
+            None,
+            Some(0),
+            Some(0),
+            None,
+        ])) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, true);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(true),
-        );
         let expected_levels = LevelInfo {
-            definition: vec![1, 0, 1, 1, 0],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 1,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![1, 0, 1, 1, 0]),

Review Comment:
   I personally find the new tests easier to read 👍 



##########
parquet/src/arrow/levels.rs:
##########
@@ -171,650 +89,390 @@ impl LevelInfo {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal(_, _)
-            | DataType::FixedSizeBinary(_) => {
-                // we return a vector of 1 value to represent the primitive
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            | DataType::FixedSizeBinary(_)
+    )
+}
+
+/// The definition and repetition level of an array within a potentially nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+    /// The current repetition level
+    rep_level: i16,
+    /// The current definition level
+    def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+    Primitive(LevelInfo),
+    List(Box<LevelInfoBuilder>, LevelContext),
+    Struct(Vec<LevelInfoBuilder>, LevelContext),
+}
+
+impl LevelInfoBuilder {
+    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
+    fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
+        match field.data_type() {
+            d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
+                parent_ctx,
+                field.is_nullable(),
+            ))),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive(
+                LevelInfo::new(parent_ctx, field.is_nullable()),
+            )),
+            DataType::Struct(children) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 1,
+                    false => parent_ctx.def_level,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level,
+                    def_level,
+                };
+
+                let children = children
+                    .iter()
+                    .map(|f| Self::try_new(f, ctx))
+                    .collect::<Result<_>>()?;
+
+                Ok(Self::Struct(children, ctx))
             }
-            DataType::List(list_field) | DataType::LargeList(list_field) => {
-                let child_offset = array_offsets[0] as usize;
-                let child_len = *array_offsets.last().unwrap() as usize;
-                // Calculate the list level
-                let list_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::List(field.is_nullable()),
-                );
-
-                // Construct the child array of the list, and get its offset + mask
-                let array_data = array.data();
-                let child_data = array_data.child_data().get(0).unwrap();
-                let child_array = make_array(child_data.clone());
-                let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
-                    &child_array,
-                    child_offset,
-                    child_len - child_offset,
-                );
-
-                match child_array.data_type() {
-                    DataType::Null
-                    | DataType::Boolean
-                    | DataType::Int8
-                    | DataType::Int16
-                    | DataType::Int32
-                    | DataType::Int64
-                    | DataType::UInt8
-                    | DataType::UInt16
-                    | DataType::UInt32
-                    | DataType::UInt64
-                    | DataType::Float16
-                    | DataType::Float32
-                    | DataType::Float64
-                    | DataType::Timestamp(_, _)
-                    | DataType::Date32
-                    | DataType::Date64
-                    | DataType::Time32(_)
-                    | DataType::Time64(_)
-                    | DataType::Duration(_)
-                    | DataType::Interval(_)
-                    | DataType::Binary
-                    | DataType::LargeBinary
-                    | DataType::Utf8
-                    | DataType::LargeUtf8
-                    | DataType::Dictionary(_, _)
-                    | DataType::Decimal(_, _)
-                    | DataType::FixedSizeBinary(_) => {
-                        vec![list_level.calculate_child_levels(
-                            child_offsets,
-                            child_mask,
-                            LevelType::Primitive(list_field.is_nullable()),
-                        )]
-                    }
-                    DataType::List(_)
-                    | DataType::LargeList(_)
-                    | DataType::Struct(_)
-                    | DataType::Map(_, _) => {
-                        list_level.calculate_array_levels(&child_array, list_field)
-                    }
-                    DataType::FixedSizeList(_, _) => unimplemented!(),
-                    DataType::Union(_, _, _) => unimplemented!(),
-                }
+            DataType::List(child)
+            | DataType::LargeList(child)
+            | DataType::Map(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))
             }
-            DataType::Map(map_field, _) => {
-                // Calculate the map level
-                let map_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    // A map is treated like a list as it has repetition
-                    LevelType::List(field.is_nullable()),
-                );
-
-                let map_array = array.as_any().downcast_ref::<MapArray>().unwrap();
-
-                let key_array = map_array.keys();
-                let value_array = map_array.values();
-
-                if let DataType::Struct(fields) = map_field.data_type() {
-                    let key_field = &fields[0];
-                    let value_field = &fields[1];
-
-                    let mut map_levels = vec![];
-
-                    // Get key levels
-                    let mut key_levels =
-                        map_level.calculate_array_levels(&key_array, key_field);
-                    map_levels.append(&mut key_levels);
-
-                    let mut value_levels =
-                        map_level.calculate_array_levels(&value_array, value_field);
-                    map_levels.append(&mut value_levels);
-
-                    map_levels
-                } else {
-                    panic!(
-                        "Map field should be a struct, found {:?}",
-                        map_field.data_type()
-                    );
-                }
+            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
+        }
+    }
+
+    /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns
+    /// as enumerated by a depth-first search
+    fn finish(self) -> Vec<LevelInfo> {
+        match self {
+            LevelInfoBuilder::Primitive(v) => vec![v],
+            LevelInfoBuilder::List(v, _) => v.finish(),
+            LevelInfoBuilder::Struct(v, _) => {
+                v.into_iter().flat_map(|l| l.finish()).collect()
             }
-            DataType::FixedSizeList(_, _) => unimplemented!(),
-            DataType::Struct(struct_fields) => {
-                let struct_array: &StructArray = array
+        }
+    }
+
+    /// Given an `array`, write the level data for the elements in `range`
+    fn write(&mut self, array: &ArrayRef, range: Range<usize>) {
+        match array.data_type() {
+            d if is_leaf(d) => self.write_leaf(array, range),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+                self.write_leaf(array, range)
+            }
+            DataType::Struct(_) => {
+                let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+                self.write_struct(array, range)
+            }
+            DataType::List(_) => {
+                let array = array
                     .as_any()
-                    .downcast_ref::<StructArray>()
-                    .expect("Unable to get struct array");
-                let mut struct_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Struct(field.is_nullable()),
-                );
-
-                // If the parent field is a list, calculate the children of the struct as if it
-                // were a list as well.
-                if matches!(self.level_type, LevelType::List(_)) {
-                    struct_level.level_type = LevelType::List(false);
-                }
+                    .downcast_ref::<GenericListArray<i32>>()
+                    .unwrap();
+                self.write_list(array.value_offsets(), array.data(), range)
+            }
+            DataType::LargeList(_) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<GenericListArray<i64>>()
+                    .unwrap();
 
-                let mut struct_levels = vec![];
-                struct_array
-                    .columns()
-                    .into_iter()
-                    .zip(struct_fields)
-                    .for_each(|(child_array, child_field)| {
-                        let mut levels =
-                            struct_level.calculate_array_levels(child_array, child_field);
-                        struct_levels.append(&mut levels);
-                    });
-                struct_levels
+                self.write_list(array.value_offsets(), array.data(), range)
             }
-            DataType::Union(_, _, _) => unimplemented!(),
-            DataType::Dictionary(_, _) => {
-                // Need to check for these cases not implemented in C++:
-                // - "Writing DictionaryArray with nested dictionary type not yet supported"
-                // - "Writing DictionaryArray with null encoded in dictionary type not yet supported"
-                // vec![self.get_primitive_def_levels(array, field, array_mask)]
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            DataType::Map(_, _) => {
+                let array = array.as_any().downcast_ref::<MapArray>().unwrap();
+                // A Map is just as ListArray<i32> with a StructArray child, we therefore
+                // treat it as such to avoid code duplication
+                self.write_list(array.value_offsets(), array.data(), range)
             }
+            _ => unreachable!(),
         }
     }
 
-    /// Calculate child/leaf array levels.
+    /// Write `range` elements from ListArray `array`
     ///
-    /// The algorithm works by incrementing definitions of array values based on whether:
-    /// - a value is optional or required (is_nullable)
-    /// - a list value is repeated + optional or required (is_list)
-    ///
-    /// A record batch always starts at a populated definition = level 0.
-    /// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
-    /// can only have a maximum level of 1 if it is not null.
-    /// If it is not null, we increment by 1, such that the null slots will = level 1.
-    /// The above applies to types that have no repetition (anything not a list or map).
-    ///
-    /// If a batch has lists, then we increment by up to 2 levels:
-    /// - 1 level for the list (repeated)
-    /// - 1 level if the list itself is nullable (optional)
-    ///
-    /// A list's child then gets incremented using the above rules.
-    ///
-    /// *Exceptions*
-    ///
-    /// There are 2 exceptions from the above rules:
-    ///
-    /// 1. When at the root of the schema: We always increment the
-    /// level regardless of whether the child is nullable or not. If we do not do
-    /// this, we could have a non-nullable array having a definition of 0.
-    ///
-    /// 2. List parent, non-list child: We always increment the level in this case,
-    /// regardless of whether the child is nullable or not.
-    ///
-    /// *Examples*
-    ///
-    /// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
-    /// * We don't increment the definition level as the array is not optional.
-    /// * This would leave us with a definition of 0, so the first exception applies.
-    /// * The definition level becomes 1.
-    ///
-    /// A batch with only a primitive that's nullable. `<primitive[optional]>`:
-    /// * The definition level becomes 1, as we increment it once.
-    ///
-    /// A batch with a single non-nullable list (both list and child not null):
-    /// * We calculate the level twice, for the list, and for the child.
-    /// * At the list, the level becomes 1, where 0 indicates that the list is
-    ///  empty, and 1 says it's not (determined through offsets).
-    /// * At the primitive level, the second exception applies. The level becomes 2.
-    fn calculate_child_levels(
-        &self,
-        // we use 64-bit offsets to also accommodate large arrays
-        array_offsets: Vec<i64>,
-        array_mask: Vec<bool>,
-        level_type: LevelType,
-    ) -> Self {
-        let min_len = *(array_offsets.last().unwrap()) as usize;
-        let mut definition = Vec::with_capacity(min_len);
-        let mut repetition = Vec::with_capacity(min_len);
-        let mut merged_array_mask = Vec::with_capacity(min_len);
-
-        let max_definition = match (self.level_type, level_type) {
-            // Handle the illegal cases
-            (_, LevelType::Root) => {
-                unreachable!("Cannot have a root as a child")
-            }
-            (LevelType::Primitive(_), _) => {
-                unreachable!("Cannot have a primitive parent for any type")
-            }
-            // The general case
-            (_, _) => self.max_definition + level_type.level_increment(),
+    /// Note: MapArrays are ListArray<i32> under the hood and so are dispatched to this method
+    fn write_list<O: OffsetSizeTrait>(
+        &mut self,
+        offsets: &[O],
+        list_data: &ArrayData,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
         };
 
-        match (self.level_type, level_type) {
-            (LevelType::List(_), LevelType::List(is_nullable)) => {
-                // Parent is a list or descendant of a list, and child is a list
-                let reps = self.repetition.clone().unwrap();
-
-                // List is null, and not empty
-                let l1 = max_definition - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                let mut nulls_seen = 0;
-
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        // If the parent length is 0, there won't be a slot for the child
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(0);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+        let offsets = &offsets[range.start..range.end + 1];
+        let child_array = make_array(list_data.child_data()[0].clone());
+
+        let write_non_null_slice =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                child.write(&child_array, start_idx..end_idx);
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    let mut rev = rep_levels.iter_mut().rev();
+                    let mut remaining = end_idx - start_idx;
+
+                    loop {
+                        let next = rev.next().unwrap();
+                        if *next > ctx.rep_level {
+                            // Nested element - ignore
+                            continue;
+                        }
+
+                        remaining -= 1;
+                        if remaining == 0 {
+                            *next = ctx.rep_level - 1;
+                            break;
+                        }
+                    }
+                })
+            };
+
+        let write_empty_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 1);
+            })
+        };
+
+        let write_null_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 2);
+            })
+        };
+
+        match list_data.null_bitmap() {
+            Some(nulls) => {
+                let null_offset = list_data.offset() + range.start;
+                for (idx, w) in offsets.windows(2).enumerate() {
+                    let is_valid = nulls.is_set(idx + null_offset);
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if !is_valid {
+                        write_null_slice(child)
+                    } else if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        (start..end).for_each(|parent_index| {
-                            let index = parent_index + nulls_seen - self.offset;
-                            let parent_index = parent_index - self.offset;
-
-                            // parent is either defined at this level, or earlier
-                            let parent_def = self.definition[index];
-                            let parent_rep = reps[index];
-                            let parent_mask = self.array_mask[index];
-
-                            // valid parent, index into children
-                            let child_start = array_offsets[parent_index] as usize;
-                            let child_end = array_offsets[parent_index + 1] as usize;
-                            let child_len = child_end - child_start;
-                            let child_mask = array_mask[parent_index];
-                            let merged_mask = parent_mask && child_mask;
-
-                            if child_len == 0 {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(parent_rep);
-                                merged_array_mask.push(merged_mask);
-                            } else {
-                                (child_start..child_end).for_each(|child_index| {
-                                    let rep = match (
-                                        parent_index == start,
-                                        child_index == child_start,
-                                    ) {
-                                        (true, true) => parent_rep,
-                                        (true, false) => parent_rep + 2,
-                                        (false, true) => parent_rep,
-                                        (false, false) => parent_rep + 1,
-                                    };
-
-                                    definition.push(if !parent_mask {
-                                        parent_def
-                                    } else if child_mask {
-                                        l3
-                                    } else {
-                                        l1
-                                    });
-                                    repetition.push(rep);
-                                    merged_array_mask.push(merged_mask);
-                                });
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (LevelType::List(_), _) => {
-                // List and primitive (or struct).
-                // The list can have more values than the primitive, indicating that there
-                // are slots where the list is empty. We use a counter to track this behaviour.
-                let mut nulls_seen = 0;
-
-                // let child_max_definition = list_max_definition + is_nullable as i16;
-                // child values are a function of parent list offsets
-                let reps = self.repetition.as_deref().unwrap();
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(reps[index]);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+            None => {
+                for w in offsets.windows(2) {
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        // iterate through the array, adjusting child definitions for nulls
-                        (start..end).for_each(|child_index| {
-                            let index = child_index + nulls_seen - self.offset;
-                            let child_mask = array_mask[child_index - self.offset];
-                            let parent_mask = self.array_mask[index];
-                            let parent_def = self.definition[index];
-
-                            if !parent_mask || parent_def < self.max_definition {
-                                definition.push(parent_def);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(parent_mask);
-                            } else {
-                                definition.push(max_definition - !child_mask as i16);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(child_mask);
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets: self.array_offsets.clone(),
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (_, LevelType::List(is_nullable)) => {
-                // Encountering a list for the first time.
-                // Calculate the 2 list hierarchy definitions in advance
-
-                // List is null, and not empty
-                let l1 = max_definition - 1 - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                self.definition
-                    .iter()
-                    .enumerate()
-                    .for_each(|(parent_index, def)| {
-                        let child_from = array_offsets[parent_index];
-                        let child_to = array_offsets[parent_index + 1];
-                        let child_len = child_to - child_from;
-                        let child_mask = array_mask[parent_index];
-                        let parent_mask = self.array_mask[parent_index];
-
-                        match (parent_mask, child_len) {
-                            (true, 0) => {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (false, 0) => {
-                                // Inherit the parent definition as parent was null
-                                definition.push(*def);
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (true, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // l1 and l3 make sense as list is not empty,
-                                    // but we reflect that it's either null or not
-                                    definition.push(if child_mask { l3 } else { l1 });
-                                    // Mark the first child slot as 0, and the next as 1
-                                    repetition.push(if child_index == child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(child_mask);
-                                });
+        }
+    }
+
+    /// Write `range` elements from StructArray `array`
+    fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
+        let (children, ctx) = match self {
+            Self::Struct(children, ctx) => (children, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+            for child in children {
+                child.visit_leaves(|info| {
+                    let len = range.end - range.start;
+
+                    let def_levels = info.def_levels.as_mut().unwrap();
+                    def_levels.reserve(len);
+                    for _ in 0..len {
+                        def_levels.push(ctx.def_level - 1);
+                    }

Review Comment:
   I wonder if this would more "idomatic" if it were written like: 
   ```rust
   def_levels
     .extend(
        std::iter::repeat(ctx.def_level - 1)
       .take(len)
     )
   ```
   
   ?  Though I admit the current code seems more readable 👍 



##########
parquet/src/arrow/levels.rs:
##########
@@ -1634,28 +896,14 @@ mod tests {
 
         let batch = reader.next().unwrap().unwrap();
 
-        let expected_batch_level = LevelInfo {

Review Comment:
   what happened to this test?



##########
parquet/src/arrow/levels.rs:
##########
@@ -1331,50 +662,27 @@ mod tests {
 
         let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
 
-        let expected_batch_level = LevelInfo {
-            definition: vec![0; 2],
-            repetition: None,
-            array_offsets: (0..=2).collect(),
-            array_mask: vec![true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 2,
-            length: 2,
-        };
-
-        let batch_level = LevelInfo::new(2, 2);
-        assert_eq!(&batch_level, &expected_batch_level);
-
         // calculate the list's level
         let mut levels = vec![];
         batch
             .columns()
             .iter()
             .zip(batch.schema().fields())
             .for_each(|(array, field)| {
-                let mut array_levels = batch_level.calculate_array_levels(array, field);
+                let mut array_levels = calculate_array_levels(array, field).unwrap();
                 levels.append(&mut array_levels);
             });
         assert_eq!(levels.len(), 1);
 
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![0, 3, 3, 3],
-            repetition: Some(vec![0, 0, 1, 1]),
-            array_offsets: vec![3, 3, 6],
-            array_mask: vec![false, true, true, true],
-            max_definition: 3,
-            level_type: LevelType::Primitive(true),
-            offset: 3,
-            length: 3,
+            def_levels: Some(vec![2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2]),

Review Comment:
   can you explain why these are different than the original test? like originally `definition` vector has 4 elements but in this PR it has 12 🤔 



##########
parquet/src/arrow/levels.rs:
##########
@@ -825,495 +483,168 @@ mod tests {
 
     use arrow::array::*;
     use arrow::buffer::Buffer;
-    use arrow::datatypes::{Schema, ToByteSlice};
+    use arrow::datatypes::{Int32Type, Schema, ToByteSlice};
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_columns;
 
     #[test]
     fn test_calculate_array_levels_twitter_example() {
         // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
         // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
-        let parent_levels = LevelInfo {
-            definition: vec![0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
-            array_mask: vec![true, true], // both lists defined
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 2,
-        };
-        // offset into array, each level1 has 2 values
-        let array_offsets = vec![0, 2, 4];
-        let array_mask = vec![true, true];
-
-        // calculate level1 levels
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        //
-        let expected_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1],
-            repetition: Some(vec![0, 1, 0, 1]),
-            array_offsets,
-            array_mask: vec![true, true, true, true],
-            max_definition: 1,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 4,
-        };
-        // the separate asserts make it easier to see what's failing
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        // this assert is to help if there are more variables added to the struct
-        assert_eq!(&levels, &expected_levels);
-
-        // level2
-        let parent_levels = levels;
-        let array_offsets = vec![0, 3, 7, 8, 10];
-        let array_mask = vec![true, true, true, true];
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        let expected_levels = LevelInfo {
-            definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
-            array_offsets,
-            array_mask: vec![true; 10],
-            max_definition: 2,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 10,
+
+        let leaf_type = Field::new("item", DataType::Int32, false);
+        let inner_type = DataType::List(Box::new(leaf_type));
+        let inner_field = Field::new("l2", inner_type.clone(), false);
+        let outer_type = DataType::List(Box::new(inner_field));
+        let outer_field = Field::new("l1", outer_type.clone(), false);
+
+        let primitives = Int32Array::from_iter(0..10);
+
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
+        let inner_list = ArrayDataBuilder::new(inner_type)
+            .len(4)
+            .add_buffer(offsets)
+            .add_child_data(primitives.data().clone())
+            .build()
+            .unwrap();
+
+        let offsets = Buffer::from_iter([0_i32, 2, 4]);
+        let outer_list = ArrayDataBuilder::new(outer_type)
+            .len(2)
+            .add_buffer(offsets)
+            .add_child_data(inner_list)
+            .build()
+            .unwrap();
+        let outer_list = make_array(outer_list);
+
+        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected = LevelInfo {
+            def_levels: Some(vec![2; 10]),
+            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
+            max_def_level: 2,
+            max_rep_level: 2,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected);
     }
 
     #[test]
     fn test_calculate_one_level_1() {
         // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets: (0..=10).collect(),
-            array_mask: vec![true; 10],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 10,
-        };
-        let array_offsets: Vec<i64> = (0..=10).collect();
-        let array_mask = vec![true; 10];
+        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, false);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(false),
-        );
         let expected_levels = LevelInfo {
-            // As it is non-null, definitions can be omitted
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 0,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 10,
+            def_levels: None,
+            rep_levels: None,
+            non_null_indices: (0..10).collect(),
+            max_def_level: 0,
+            max_rep_level: 0,
         };
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_one_level_2() {
-        // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: (0..=5).collect(),
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
-        };
-        let array_offsets: Vec<i64> = (0..=5).collect();
-        let array_mask = vec![true, false, true, true, false];
+        // This test calculates the levels for a nullable primitive array
+        let array = Arc::new(Int32Array::from_iter([
+            Some(0),
+            None,
+            Some(0),
+            Some(0),
+            None,
+        ])) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, true);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(true),
-        );
         let expected_levels = LevelInfo {
-            definition: vec![1, 0, 1, 1, 0],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 1,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![1, 0, 1, 1, 0]),
+            rep_levels: None,
+            non_null_indices: vec![0, 2, 3],
+            max_def_level: 1,
+            max_rep_level: 0,
         };
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_array_levels_1() {
+        let leaf_field = Field::new("item", DataType::Int32, false);
+        let list_field = Field::new("list", DataType::List(Box::new(leaf_field)), false);
+
         // if all array values are defined (e.g. batch<list<_>>)
         // [[0], [1], [2], [3], [4]]
-        let parent_levels = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
+
+        let leaf_array = Int32Array::from_iter(0..5);
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter(0_i32..6);
+        let list = ArrayDataBuilder::new(list_field.data_type().clone())
+            .len(5)
+            .add_buffer(offsets)
+            .add_child_data(leaf_array.data().clone())
+            .build()
+            .unwrap();
+        let list = make_array(list);
+
+        let levels = calculate_array_levels(&list, &list_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected_levels = LevelInfo {
+            def_levels: Some(vec![1; 5]),
+            rep_levels: Some(vec![0; 5]),
+            non_null_indices: (0..5).collect(),
+            max_def_level: 1,
+            max_rep_level: 1,
         };
-        let array_offsets = vec![0, 2, 2, 4, 8, 11];
-        let array_mask = vec![true, false, true, true, true];
+        assert_eq!(&levels[0], &expected_levels);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
-        // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
+        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
         // all values are defined as we do not have nulls on the root (batch)
         // repetition:
         //   0: 0, 1
-        //   1:
+        //   1: 0
         //   2: 0, 1
         //   3: 0, 1, 1, 1
         //   4: 0, 1, 1
-        let expected_levels = LevelInfo {
-            // The levels are normally 2 because we:
-            // - Calculate the level at the list
-            // - Calculate the level at the list's child
-            // We do not do this in these tests, thus the levels are 1 less.
-            definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
-            array_offsets,
-            array_mask: vec![
-                true, true, false, true, true, true, true, true, true, true, true, true,
-            ],
-            max_definition: 2,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 11, // the child has 11 slots
-        };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
-    }
-
-    #[test]
-    fn test_calculate_array_levels_2() {
-        // If some values are null
-        //
-        // This emulates an array in the form: <struct<list<?>>
-        // with values:
-        // - 0: [0, 1], but is null because of the struct
-        // - 1: []
-        // - 2: [2, 3], but is null because of the struct
-        // - 3: [4, 5, 6, 7]
-        // - 4: [8, 9, 10]
-        //
-        // If the first values of a list are null due to a parent, we have to still account for them
-        // while indexing, because they would affect the way the child is indexed
-        // i.e. in the above example, we have to know that [0, 1] has to be skipped
-        let parent_levels = LevelInfo {
-            definition: vec![0, 1, 0, 1, 1],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![false, true, false, true, true],
-            max_definition: 1,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 5,
-        };
-        let array_offsets = vec![0, 2, 2, 4, 8, 11];
-        let array_mask = vec![true, false, true, true, true];
+        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);

Review Comment:
   we seem to have lost a substantial number of verifications in this test. Is that intended?



##########
parquet/src/arrow/levels.rs:
##########
@@ -825,495 +483,168 @@ mod tests {
 
     use arrow::array::*;
     use arrow::buffer::Buffer;
-    use arrow::datatypes::{Schema, ToByteSlice};
+    use arrow::datatypes::{Int32Type, Schema, ToByteSlice};
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_columns;
 
     #[test]
     fn test_calculate_array_levels_twitter_example() {
         // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
         // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
-        let parent_levels = LevelInfo {
-            definition: vec![0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
-            array_mask: vec![true, true], // both lists defined
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 2,
-        };
-        // offset into array, each level1 has 2 values
-        let array_offsets = vec![0, 2, 4];
-        let array_mask = vec![true, true];
-
-        // calculate level1 levels
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        //
-        let expected_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1],
-            repetition: Some(vec![0, 1, 0, 1]),
-            array_offsets,
-            array_mask: vec![true, true, true, true],
-            max_definition: 1,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 4,
-        };
-        // the separate asserts make it easier to see what's failing
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        // this assert is to help if there are more variables added to the struct
-        assert_eq!(&levels, &expected_levels);
-
-        // level2
-        let parent_levels = levels;
-        let array_offsets = vec![0, 3, 7, 8, 10];
-        let array_mask = vec![true, true, true, true];
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        let expected_levels = LevelInfo {
-            definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
-            array_offsets,
-            array_mask: vec![true; 10],
-            max_definition: 2,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 10,
+
+        let leaf_type = Field::new("item", DataType::Int32, false);
+        let inner_type = DataType::List(Box::new(leaf_type));
+        let inner_field = Field::new("l2", inner_type.clone(), false);
+        let outer_type = DataType::List(Box::new(inner_field));
+        let outer_field = Field::new("l1", outer_type.clone(), false);
+
+        let primitives = Int32Array::from_iter(0..10);
+
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
+        let inner_list = ArrayDataBuilder::new(inner_type)
+            .len(4)
+            .add_buffer(offsets)
+            .add_child_data(primitives.data().clone())
+            .build()
+            .unwrap();
+
+        let offsets = Buffer::from_iter([0_i32, 2, 4]);
+        let outer_list = ArrayDataBuilder::new(outer_type)
+            .len(2)
+            .add_buffer(offsets)
+            .add_child_data(inner_list)
+            .build()
+            .unwrap();
+        let outer_list = make_array(outer_list);
+
+        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected = LevelInfo {

Review Comment:
   the original test also verified `level1` but this PR now only appears to verify `levels[0]` (what was called "level2" ?)



##########
parquet/src/arrow/levels.rs:
##########
@@ -1475,81 +769,51 @@ mod tests {
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![0, 0, 0, 0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 5,
+            def_levels: None,

Review Comment:
   is it expected to go from `definition` of all zeros to `None`?



##########
parquet/src/arrow/levels.rs:
##########
@@ -171,650 +89,390 @@ impl LevelInfo {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal(_, _)
-            | DataType::FixedSizeBinary(_) => {
-                // we return a vector of 1 value to represent the primitive
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            | DataType::FixedSizeBinary(_)
+    )
+}
+
+/// The definition and repetition level of an array within a potentially nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+    /// The current repetition level
+    rep_level: i16,
+    /// The current definition level
+    def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+    Primitive(LevelInfo),
+    List(Box<LevelInfoBuilder>, LevelContext),

Review Comment:
   It might help to add some docstrings here about what the `Box<LevelInfoBuilder>` and `Vec<LevelBuilder>` represent



##########
parquet/src/arrow/levels.rs:
##########
@@ -171,650 +89,390 @@ impl LevelInfo {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal(_, _)
-            | DataType::FixedSizeBinary(_) => {
-                // we return a vector of 1 value to represent the primitive
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            | DataType::FixedSizeBinary(_)
+    )
+}
+
+/// The definition and repetition level of an array within a potentially nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+    /// The current repetition level
+    rep_level: i16,
+    /// The current definition level
+    def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+    Primitive(LevelInfo),
+    List(Box<LevelInfoBuilder>, LevelContext),
+    Struct(Vec<LevelInfoBuilder>, LevelContext),
+}
+
+impl LevelInfoBuilder {
+    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
+    fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
+        match field.data_type() {
+            d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
+                parent_ctx,
+                field.is_nullable(),
+            ))),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive(
+                LevelInfo::new(parent_ctx, field.is_nullable()),
+            )),
+            DataType::Struct(children) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 1,
+                    false => parent_ctx.def_level,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level,
+                    def_level,
+                };
+
+                let children = children
+                    .iter()
+                    .map(|f| Self::try_new(f, ctx))
+                    .collect::<Result<_>>()?;
+
+                Ok(Self::Struct(children, ctx))
             }
-            DataType::List(list_field) | DataType::LargeList(list_field) => {
-                let child_offset = array_offsets[0] as usize;
-                let child_len = *array_offsets.last().unwrap() as usize;
-                // Calculate the list level
-                let list_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::List(field.is_nullable()),
-                );
-
-                // Construct the child array of the list, and get its offset + mask
-                let array_data = array.data();
-                let child_data = array_data.child_data().get(0).unwrap();
-                let child_array = make_array(child_data.clone());
-                let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
-                    &child_array,
-                    child_offset,
-                    child_len - child_offset,
-                );
-
-                match child_array.data_type() {
-                    DataType::Null
-                    | DataType::Boolean
-                    | DataType::Int8
-                    | DataType::Int16
-                    | DataType::Int32
-                    | DataType::Int64
-                    | DataType::UInt8
-                    | DataType::UInt16
-                    | DataType::UInt32
-                    | DataType::UInt64
-                    | DataType::Float16
-                    | DataType::Float32
-                    | DataType::Float64
-                    | DataType::Timestamp(_, _)
-                    | DataType::Date32
-                    | DataType::Date64
-                    | DataType::Time32(_)
-                    | DataType::Time64(_)
-                    | DataType::Duration(_)
-                    | DataType::Interval(_)
-                    | DataType::Binary
-                    | DataType::LargeBinary
-                    | DataType::Utf8
-                    | DataType::LargeUtf8
-                    | DataType::Dictionary(_, _)
-                    | DataType::Decimal(_, _)
-                    | DataType::FixedSizeBinary(_) => {
-                        vec![list_level.calculate_child_levels(
-                            child_offsets,
-                            child_mask,
-                            LevelType::Primitive(list_field.is_nullable()),
-                        )]
-                    }
-                    DataType::List(_)
-                    | DataType::LargeList(_)
-                    | DataType::Struct(_)
-                    | DataType::Map(_, _) => {
-                        list_level.calculate_array_levels(&child_array, list_field)
-                    }
-                    DataType::FixedSizeList(_, _) => unimplemented!(),
-                    DataType::Union(_, _, _) => unimplemented!(),
-                }
+            DataType::List(child)
+            | DataType::LargeList(child)
+            | DataType::Map(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))
             }
-            DataType::Map(map_field, _) => {
-                // Calculate the map level
-                let map_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    // A map is treated like a list as it has repetition
-                    LevelType::List(field.is_nullable()),
-                );
-
-                let map_array = array.as_any().downcast_ref::<MapArray>().unwrap();
-
-                let key_array = map_array.keys();
-                let value_array = map_array.values();
-
-                if let DataType::Struct(fields) = map_field.data_type() {
-                    let key_field = &fields[0];
-                    let value_field = &fields[1];
-
-                    let mut map_levels = vec![];
-
-                    // Get key levels
-                    let mut key_levels =
-                        map_level.calculate_array_levels(&key_array, key_field);
-                    map_levels.append(&mut key_levels);
-
-                    let mut value_levels =
-                        map_level.calculate_array_levels(&value_array, value_field);
-                    map_levels.append(&mut value_levels);
-
-                    map_levels
-                } else {
-                    panic!(
-                        "Map field should be a struct, found {:?}",
-                        map_field.data_type()
-                    );
-                }
+            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
+        }
+    }
+
+    /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns
+    /// as enumerated by a depth-first search
+    fn finish(self) -> Vec<LevelInfo> {
+        match self {
+            LevelInfoBuilder::Primitive(v) => vec![v],
+            LevelInfoBuilder::List(v, _) => v.finish(),
+            LevelInfoBuilder::Struct(v, _) => {
+                v.into_iter().flat_map(|l| l.finish()).collect()
             }
-            DataType::FixedSizeList(_, _) => unimplemented!(),
-            DataType::Struct(struct_fields) => {
-                let struct_array: &StructArray = array
+        }
+    }
+
+    /// Given an `array`, write the level data for the elements in `range`
+    fn write(&mut self, array: &ArrayRef, range: Range<usize>) {
+        match array.data_type() {
+            d if is_leaf(d) => self.write_leaf(array, range),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+                self.write_leaf(array, range)
+            }
+            DataType::Struct(_) => {
+                let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+                self.write_struct(array, range)
+            }
+            DataType::List(_) => {
+                let array = array
                     .as_any()
-                    .downcast_ref::<StructArray>()
-                    .expect("Unable to get struct array");
-                let mut struct_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Struct(field.is_nullable()),
-                );
-
-                // If the parent field is a list, calculate the children of the struct as if it
-                // were a list as well.
-                if matches!(self.level_type, LevelType::List(_)) {
-                    struct_level.level_type = LevelType::List(false);
-                }
+                    .downcast_ref::<GenericListArray<i32>>()
+                    .unwrap();
+                self.write_list(array.value_offsets(), array.data(), range)
+            }
+            DataType::LargeList(_) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<GenericListArray<i64>>()
+                    .unwrap();
 
-                let mut struct_levels = vec![];
-                struct_array
-                    .columns()
-                    .into_iter()
-                    .zip(struct_fields)
-                    .for_each(|(child_array, child_field)| {
-                        let mut levels =
-                            struct_level.calculate_array_levels(child_array, child_field);
-                        struct_levels.append(&mut levels);
-                    });
-                struct_levels
+                self.write_list(array.value_offsets(), array.data(), range)
             }
-            DataType::Union(_, _, _) => unimplemented!(),
-            DataType::Dictionary(_, _) => {
-                // Need to check for these cases not implemented in C++:
-                // - "Writing DictionaryArray with nested dictionary type not yet supported"
-                // - "Writing DictionaryArray with null encoded in dictionary type not yet supported"
-                // vec![self.get_primitive_def_levels(array, field, array_mask)]
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            DataType::Map(_, _) => {
+                let array = array.as_any().downcast_ref::<MapArray>().unwrap();
+                // A Map is just as ListArray<i32> with a StructArray child, we therefore
+                // treat it as such to avoid code duplication
+                self.write_list(array.value_offsets(), array.data(), range)
             }
+            _ => unreachable!(),
         }
     }
 
-    /// Calculate child/leaf array levels.
+    /// Write `range` elements from ListArray `array`
     ///
-    /// The algorithm works by incrementing definitions of array values based on whether:
-    /// - a value is optional or required (is_nullable)
-    /// - a list value is repeated + optional or required (is_list)
-    ///
-    /// A record batch always starts at a populated definition = level 0.
-    /// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
-    /// can only have a maximum level of 1 if it is not null.
-    /// If it is not null, we increment by 1, such that the null slots will = level 1.
-    /// The above applies to types that have no repetition (anything not a list or map).
-    ///
-    /// If a batch has lists, then we increment by up to 2 levels:
-    /// - 1 level for the list (repeated)
-    /// - 1 level if the list itself is nullable (optional)
-    ///
-    /// A list's child then gets incremented using the above rules.
-    ///
-    /// *Exceptions*
-    ///
-    /// There are 2 exceptions from the above rules:
-    ///
-    /// 1. When at the root of the schema: We always increment the
-    /// level regardless of whether the child is nullable or not. If we do not do
-    /// this, we could have a non-nullable array having a definition of 0.
-    ///
-    /// 2. List parent, non-list child: We always increment the level in this case,
-    /// regardless of whether the child is nullable or not.
-    ///
-    /// *Examples*
-    ///
-    /// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
-    /// * We don't increment the definition level as the array is not optional.
-    /// * This would leave us with a definition of 0, so the first exception applies.
-    /// * The definition level becomes 1.
-    ///
-    /// A batch with only a primitive that's nullable. `<primitive[optional]>`:
-    /// * The definition level becomes 1, as we increment it once.
-    ///
-    /// A batch with a single non-nullable list (both list and child not null):
-    /// * We calculate the level twice, for the list, and for the child.
-    /// * At the list, the level becomes 1, where 0 indicates that the list is
-    ///  empty, and 1 says it's not (determined through offsets).
-    /// * At the primitive level, the second exception applies. The level becomes 2.
-    fn calculate_child_levels(
-        &self,
-        // we use 64-bit offsets to also accommodate large arrays
-        array_offsets: Vec<i64>,
-        array_mask: Vec<bool>,
-        level_type: LevelType,
-    ) -> Self {
-        let min_len = *(array_offsets.last().unwrap()) as usize;
-        let mut definition = Vec::with_capacity(min_len);
-        let mut repetition = Vec::with_capacity(min_len);
-        let mut merged_array_mask = Vec::with_capacity(min_len);
-
-        let max_definition = match (self.level_type, level_type) {
-            // Handle the illegal cases
-            (_, LevelType::Root) => {
-                unreachable!("Cannot have a root as a child")
-            }
-            (LevelType::Primitive(_), _) => {
-                unreachable!("Cannot have a primitive parent for any type")
-            }
-            // The general case
-            (_, _) => self.max_definition + level_type.level_increment(),
+    /// Note: MapArrays are ListArray<i32> under the hood and so are dispatched to this method
+    fn write_list<O: OffsetSizeTrait>(
+        &mut self,
+        offsets: &[O],
+        list_data: &ArrayData,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
         };
 
-        match (self.level_type, level_type) {
-            (LevelType::List(_), LevelType::List(is_nullable)) => {
-                // Parent is a list or descendant of a list, and child is a list
-                let reps = self.repetition.clone().unwrap();
-
-                // List is null, and not empty
-                let l1 = max_definition - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                let mut nulls_seen = 0;
-
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        // If the parent length is 0, there won't be a slot for the child
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(0);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+        let offsets = &offsets[range.start..range.end + 1];
+        let child_array = make_array(list_data.child_data()[0].clone());
+
+        let write_non_null_slice =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                child.write(&child_array, start_idx..end_idx);
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    let mut rev = rep_levels.iter_mut().rev();
+                    let mut remaining = end_idx - start_idx;
+
+                    loop {
+                        let next = rev.next().unwrap();
+                        if *next > ctx.rep_level {
+                            // Nested element - ignore
+                            continue;
+                        }
+
+                        remaining -= 1;
+                        if remaining == 0 {
+                            *next = ctx.rep_level - 1;
+                            break;
+                        }
+                    }
+                })
+            };
+
+        let write_empty_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 1);
+            })
+        };
+
+        let write_null_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 2);
+            })
+        };
+
+        match list_data.null_bitmap() {
+            Some(nulls) => {
+                let null_offset = list_data.offset() + range.start;
+                for (idx, w) in offsets.windows(2).enumerate() {
+                    let is_valid = nulls.is_set(idx + null_offset);
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if !is_valid {
+                        write_null_slice(child)
+                    } else if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        (start..end).for_each(|parent_index| {
-                            let index = parent_index + nulls_seen - self.offset;
-                            let parent_index = parent_index - self.offset;
-
-                            // parent is either defined at this level, or earlier
-                            let parent_def = self.definition[index];
-                            let parent_rep = reps[index];
-                            let parent_mask = self.array_mask[index];
-
-                            // valid parent, index into children
-                            let child_start = array_offsets[parent_index] as usize;
-                            let child_end = array_offsets[parent_index + 1] as usize;
-                            let child_len = child_end - child_start;
-                            let child_mask = array_mask[parent_index];
-                            let merged_mask = parent_mask && child_mask;
-
-                            if child_len == 0 {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(parent_rep);
-                                merged_array_mask.push(merged_mask);
-                            } else {
-                                (child_start..child_end).for_each(|child_index| {
-                                    let rep = match (
-                                        parent_index == start,
-                                        child_index == child_start,
-                                    ) {
-                                        (true, true) => parent_rep,
-                                        (true, false) => parent_rep + 2,
-                                        (false, true) => parent_rep,
-                                        (false, false) => parent_rep + 1,
-                                    };
-
-                                    definition.push(if !parent_mask {
-                                        parent_def
-                                    } else if child_mask {
-                                        l3
-                                    } else {
-                                        l1
-                                    });
-                                    repetition.push(rep);
-                                    merged_array_mask.push(merged_mask);
-                                });
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (LevelType::List(_), _) => {
-                // List and primitive (or struct).
-                // The list can have more values than the primitive, indicating that there
-                // are slots where the list is empty. We use a counter to track this behaviour.
-                let mut nulls_seen = 0;
-
-                // let child_max_definition = list_max_definition + is_nullable as i16;
-                // child values are a function of parent list offsets
-                let reps = self.repetition.as_deref().unwrap();
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(reps[index]);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+            None => {
+                for w in offsets.windows(2) {
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        // iterate through the array, adjusting child definitions for nulls
-                        (start..end).for_each(|child_index| {
-                            let index = child_index + nulls_seen - self.offset;
-                            let child_mask = array_mask[child_index - self.offset];
-                            let parent_mask = self.array_mask[index];
-                            let parent_def = self.definition[index];
-
-                            if !parent_mask || parent_def < self.max_definition {
-                                definition.push(parent_def);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(parent_mask);
-                            } else {
-                                definition.push(max_definition - !child_mask as i16);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(child_mask);
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets: self.array_offsets.clone(),
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (_, LevelType::List(is_nullable)) => {
-                // Encountering a list for the first time.
-                // Calculate the 2 list hierarchy definitions in advance
-
-                // List is null, and not empty
-                let l1 = max_definition - 1 - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                self.definition
-                    .iter()
-                    .enumerate()
-                    .for_each(|(parent_index, def)| {
-                        let child_from = array_offsets[parent_index];
-                        let child_to = array_offsets[parent_index + 1];
-                        let child_len = child_to - child_from;
-                        let child_mask = array_mask[parent_index];
-                        let parent_mask = self.array_mask[parent_index];
-
-                        match (parent_mask, child_len) {
-                            (true, 0) => {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (false, 0) => {
-                                // Inherit the parent definition as parent was null
-                                definition.push(*def);
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (true, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // l1 and l3 make sense as list is not empty,
-                                    // but we reflect that it's either null or not
-                                    definition.push(if child_mask { l3 } else { l1 });
-                                    // Mark the first child slot as 0, and the next as 1
-                                    repetition.push(if child_index == child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(child_mask);
-                                });
+        }
+    }
+
+    /// Write `range` elements from StructArray `array`
+    fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
+        let (children, ctx) = match self {
+            Self::Struct(children, ctx) => (children, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+            for child in children {
+                child.visit_leaves(|info| {
+                    let len = range.end - range.start;
+
+                    let def_levels = info.def_levels.as_mut().unwrap();
+                    def_levels.reserve(len);
+                    for _ in 0..len {
+                        def_levels.push(ctx.def_level - 1);
+                    }
+
+                    if let Some(rep_levels) = info.rep_levels.as_mut() {
+                        rep_levels.reserve(len);
+                        for _ in 0..len {
+                            rep_levels.push(ctx.rep_level)
+                        }
+                    }
+                })
+            }
+        };
+
+        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+            for (child_array, child) in array.columns().into_iter().zip(children) {
+                child.write(child_array, range.clone())
+            }
+        };
+
+        match array.data().null_bitmap() {
+            Some(validity) => {
+                let null_offset = array.data().offset();
+                let mut last_non_null_idx = None;
+                let mut last_null_idx = None;
+
+                // TODO: BitChunkIterator

Review Comment:
   is this a TODO for this PR or for a follow on?



##########
parquet/src/arrow/levels.rs:
##########
@@ -1760,63 +1002,152 @@ mod tests {
 
         let array = Arc::new(list_builder.finish());
 
+        let values_len = array.data().child_data()[0].len();
+        assert_eq!(values_len, 5);
+
         let schema = Arc::new(Schema::new(vec![list_field]));
 
         let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
 
-        let batch_level = LevelInfo::new(0, rb.num_rows());
-        let list_level =
-            &batch_level.calculate_array_levels(rb.column(0), rb.schema().field(0))[0];
+        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
+        let list_level = &levels[0];
 
         let expected_level = LevelInfo {
-            definition: vec![4, 1, 0, 2, 2, 3, 4],
-            repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
-            array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
-            array_mask: vec![true, true, false, false, false, false, true],
-            max_definition: 4,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
+            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
+            non_null_indices: vec![0, 4],
+            max_def_level: 4,
+            max_rep_level: 1,
         };
 
         assert_eq!(list_level, &expected_level);
     }
 
     #[test]
-    fn test_nested_indices() {
-        // Given a buffer like
-        // [0, null, null, 1, 2]
-        //
-        // The two level infos below might represent the two structures
-        // 1: [{a: 0}], [], null, [null, null], [{a: 1}], [{a: 2}]
-        // 2: [0], [], null, [null, null], [1], [2]
-        //
-        // (That is, their only difference is that the leaf values are nested one level deeper in a
-        // struct).
-
-        let level1 = LevelInfo {
-            definition: vec![4, 1, 0, 2, 2, 4, 4],
-            repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
-            array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
-            array_mask: vec![true, true, false, false, false, false, true],
-            max_definition: 4,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+    fn test_struct_mask_list() {
+        // Test a struct array masking a list

Review Comment:
   I don't understand what "masking a list" means in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org