You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/27 09:19:54 UTC

[arrow-rs] branch master updated: Support writing nested lists to parquet (#1746)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e1666a82 Support writing nested lists to parquet (#1746)
8e1666a82 is described below

commit 8e1666a8206f2eea4dd4e55c9365859c6a32a3f0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri May 27 10:19:48 2022 +0100

    Support writing nested lists to parquet (#1746)
    
    * Support writing arbitrarily nested arrow arrays (#1744)
    
    * More tests
    
    * Port more tests
    
    * More tests
    
    * Review feedback
    
    * Reduce test churn
    
    * Port remaining tests
    
    * Review feedback
    
    * Fix clippy
---
 parquet/src/arrow/arrow_writer.rs |   90 +-
 parquet/src/arrow/levels.rs       | 2125 +++++++++++++++----------------------
 2 files changed, 897 insertions(+), 1318 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index a5162045b..530dfe2ad 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -33,6 +33,7 @@ use super::schema::{
     decimal_length_from_precision,
 };
 
+use crate::arrow::levels::calculate_array_levels;
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
 use crate::file::properties::WriterProperties;
@@ -173,16 +174,15 @@ impl<W: Write> ArrowWriter<W> {
                 }
             }
 
-            let mut levels: Vec<_> = arrays
+            let mut levels = arrays
                 .iter()
                 .map(|array| {
-                    let batch_level = LevelInfo::new(0, array.len());
-                    let mut levels = batch_level.calculate_array_levels(array, field);
+                    let mut levels = calculate_array_levels(array, field)?;
                     // Reverse levels as we pop() them when writing arrays
                     levels.reverse();
-                    levels
+                    Ok(levels)
                 })
-                .collect();
+                .collect::<Result<Vec<_>>>()?;
 
             write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
         }
@@ -341,26 +341,23 @@ fn write_leaf(
     column: &ArrayRef,
     levels: LevelInfo,
 ) -> Result<i64> {
-    let indices = levels.filter_array_indices();
-    // Slice array according to computed offset and length
-    let column = column.slice(levels.offset, levels.length);
+    let indices = levels.non_null_indices();
     let written = match writer {
         ColumnWriter::Int32ColumnWriter(ref mut typed) => {
             let values = match column.data_type() {
                 ArrowDataType::Date64 => {
                     // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
                     let array = if let ArrowDataType::Date64 = column.data_type() {
-                        let array =
-                            arrow::compute::cast(&column, &ArrowDataType::Date32)?;
+                        let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
                         arrow::compute::cast(&array, &ArrowDataType::Int32)?
                     } else {
-                        arrow::compute::cast(&column, &ArrowDataType::Int32)?
+                        arrow::compute::cast(column, &ArrowDataType::Int32)?
                     };
                     let array = array
                         .as_any()
                         .downcast_ref::<arrow_array::Int32Array>()
                         .expect("Unable to get int32 array");
-                    get_numeric_array_slice::<Int32Type, _>(array, &indices)
+                    get_numeric_array_slice::<Int32Type, _>(array, indices)
                 }
                 ArrowDataType::UInt32 => {
                     // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
@@ -373,21 +370,21 @@ fn write_leaf(
                         array,
                         |x| x as i32,
                     );
-                    get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+                    get_numeric_array_slice::<Int32Type, _>(&array, indices)
                 }
                 _ => {
-                    let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?;
+                    let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
                     let array = array
                         .as_any()
                         .downcast_ref::<arrow_array::Int32Array>()
                         .expect("Unable to get i32 array");
-                    get_numeric_array_slice::<Int32Type, _>(array, &indices)
+                    get_numeric_array_slice::<Int32Type, _>(array, indices)
                 }
             };
             typed.write_batch(
                 values.as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
         ColumnWriter::BoolColumnWriter(ref mut typed) => {
@@ -396,9 +393,9 @@ fn write_leaf(
                 .downcast_ref::<arrow_array::BooleanArray>()
                 .expect("Unable to get boolean array");
             typed.write_batch(
-                get_bool_array_slice(array, &indices).as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                get_bool_array_slice(array, indices).as_slice(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
         ColumnWriter::Int64ColumnWriter(ref mut typed) => {
@@ -408,7 +405,7 @@ fn write_leaf(
                         .as_any()
                         .downcast_ref::<arrow_array::Int64Array>()
                         .expect("Unable to get i64 array");
-                    get_numeric_array_slice::<Int64Type, _>(array, &indices)
+                    get_numeric_array_slice::<Int64Type, _>(array, indices)
                 }
                 ArrowDataType::UInt64 => {
                     // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
@@ -421,21 +418,21 @@ fn write_leaf(
                         array,
                         |x| x as i64,
                     );
-                    get_numeric_array_slice::<Int64Type, _>(&array, &indices)
+                    get_numeric_array_slice::<Int64Type, _>(&array, indices)
                 }
                 _ => {
-                    let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?;
+                    let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
                     let array = array
                         .as_any()
                         .downcast_ref::<arrow_array::Int64Array>()
                         .expect("Unable to get i64 array");
-                    get_numeric_array_slice::<Int64Type, _>(array, &indices)
+                    get_numeric_array_slice::<Int64Type, _>(array, indices)
                 }
             };
             typed.write_batch(
                 values.as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
         ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
@@ -447,9 +444,9 @@ fn write_leaf(
                 .downcast_ref::<arrow_array::Float32Array>()
                 .expect("Unable to get Float32 array");
             typed.write_batch(
-                get_numeric_array_slice::<FloatType, _>(array, &indices).as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                get_numeric_array_slice::<FloatType, _>(array, indices).as_slice(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
         ColumnWriter::DoubleColumnWriter(ref mut typed) => {
@@ -458,9 +455,9 @@ fn write_leaf(
                 .downcast_ref::<arrow_array::Float64Array>()
                 .expect("Unable to get Float64 array");
             typed.write_batch(
-                get_numeric_array_slice::<DoubleType, _>(array, &indices).as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                get_numeric_array_slice::<DoubleType, _>(array, indices).as_slice(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
         ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
@@ -471,8 +468,8 @@ fn write_leaf(
                     .expect("Unable to get BinaryArray array");
                 typed.write_batch(
                     get_binary_array(array).as_slice(),
-                    Some(levels.definition.as_slice()),
-                    levels.repetition.as_deref(),
+                    levels.def_levels(),
+                    levels.rep_levels(),
                 )?
             }
             ArrowDataType::Utf8 => {
@@ -482,8 +479,8 @@ fn write_leaf(
                     .expect("Unable to get LargeBinaryArray array");
                 typed.write_batch(
                     get_string_array(array).as_slice(),
-                    Some(levels.definition.as_slice()),
-                    levels.repetition.as_deref(),
+                    levels.def_levels(),
+                    levels.rep_levels(),
                 )?
             }
             ArrowDataType::LargeBinary => {
@@ -493,8 +490,8 @@ fn write_leaf(
                     .expect("Unable to get LargeBinaryArray array");
                 typed.write_batch(
                     get_large_binary_array(array).as_slice(),
-                    Some(levels.definition.as_slice()),
-                    levels.repetition.as_deref(),
+                    levels.def_levels(),
+                    levels.rep_levels(),
                 )?
             }
             ArrowDataType::LargeUtf8 => {
@@ -504,8 +501,8 @@ fn write_leaf(
                     .expect("Unable to get LargeUtf8 array");
                 typed.write_batch(
                     get_large_string_array(array).as_slice(),
-                    Some(levels.definition.as_slice()),
-                    levels.repetition.as_deref(),
+                    levels.def_levels(),
+                    levels.rep_levels(),
                 )?
             }
             _ => unreachable!("Currently unreachable because data type not supported"),
@@ -518,14 +515,14 @@ fn write_leaf(
                             .as_any()
                             .downcast_ref::<arrow_array::IntervalYearMonthArray>()
                             .unwrap();
-                        get_interval_ym_array_slice(array, &indices)
+                        get_interval_ym_array_slice(array, indices)
                     }
                     IntervalUnit::DayTime => {
                         let array = column
                             .as_any()
                             .downcast_ref::<arrow_array::IntervalDayTimeArray>()
                             .unwrap();
-                        get_interval_dt_array_slice(array, &indices)
+                        get_interval_dt_array_slice(array, indices)
                     }
                     _ => {
                         return Err(ParquetError::NYI(
@@ -541,14 +538,14 @@ fn write_leaf(
                         .as_any()
                         .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
                         .unwrap();
-                    get_fsb_array_slice(array, &indices)
+                    get_fsb_array_slice(array, indices)
                 }
                 ArrowDataType::Decimal(_, _) => {
                     let array = column
                         .as_any()
                         .downcast_ref::<arrow_array::DecimalArray>()
                         .unwrap();
-                    get_decimal_array_slice(array, &indices)
+                    get_decimal_array_slice(array, indices)
                 }
                 _ => {
                     return Err(ParquetError::NYI(
@@ -559,8 +556,8 @@ fn write_leaf(
             };
             typed.write_batch(
                 bytes.as_slice(),
-                Some(levels.definition.as_slice()),
-                levels.repetition.as_deref(),
+                levels.def_levels(),
+                levels.rep_levels(),
             )?
         }
     };
@@ -593,6 +590,7 @@ macro_rules! def_get_binary_array_fn {
     };
 }
 
+// TODO: These methods don't handle non null indices correctly (#1753)
 def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray);
 def_get_binary_array_fn!(get_string_array, arrow_array::StringArray);
 def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray);
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 9dcb00830..073754262 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -40,114 +40,32 @@
 //!
 //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
 
-use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
+use crate::errors::{ParquetError, Result};
+use arrow::array::{
+    make_array, Array, ArrayData, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait,
+    StructArray,
+};
 use arrow::datatypes::{DataType, Field};
-
-/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
-///
-/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track
-/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo]
-/// is created, and this is what is used to index into the array when writing data to Parquet.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub(crate) struct LevelInfo {
-    /// Array's definition levels
-    pub definition: Vec<i16>,
-    /// Array's optional repetition levels
-    pub repetition: Option<Vec<i16>>,
-    /// Array's offsets, 64-bit is used to accommodate large offset arrays
-    pub array_offsets: Vec<i64>,
-    // TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
-    /// Array's logical validity mask, whcih gets unpacked for list children.
-    /// If the parent of an array is null, all children are logically treated as
-    /// null. This mask keeps track of that.
-    ///
-    pub array_mask: Vec<bool>,
-    /// The maximum definition at this level, 0 at the record batch
-    pub max_definition: i16,
-    /// The type of array represented by this level info
-    pub level_type: LevelType,
-    /// The offset of the current level's array
-    pub offset: usize,
-    /// The length of the current level's array
-    pub length: usize,
-}
-
-/// LevelType defines the type of level, and whether it is nullable or not
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(crate) enum LevelType {
-    Root,
-    List(bool),
-    Struct(bool),
-    Primitive(bool),
-}
-
-impl LevelType {
-    #[inline]
-    const fn level_increment(&self) -> i16 {
-        match self {
-            LevelType::Root => 0,
-            // List repetition adds a constant 1
-            LevelType::List(is_nullable) => 1 + *is_nullable as i16,
-            LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
-                *is_nullable as i16
-            }
-        }
-    }
+use std::ops::Range;
+
+/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`]
+/// for each leaf column encountered
+pub(crate) fn calculate_array_levels(
+    array: &ArrayRef,
+    field: &Field,
+) -> Result<Vec<LevelInfo>> {
+    let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
+    builder.write(array, 0..array.len());
+    Ok(builder.finish())
 }
 
-impl LevelInfo {
-    /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
-    ///
-    /// This is a convenience function to populate the starting point of the traversal.
-    pub(crate) fn new(offset: usize, length: usize) -> Self {
-        Self {
-            // a batch has no definition level yet
-            definition: vec![0; length],
-            // a batch has no repetition as it is not a list
-            repetition: None,
-            // a batch has sequential offsets, should be num_rows + 1
-            array_offsets: (0..=(length as i64)).collect(),
-            // all values at a batch-level are non-null
-            array_mask: vec![true; length],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset,
-            length,
-        }
-    }
-
-    /// Compute nested levels of the Arrow array, recursing into lists and structs.
-    ///
-    /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
-    ///
-    /// The parent struct's nullness is tracked, as it determines whether the child
-    /// max_definition should be incremented.
-    /// The 'is_parent_struct' variable asks "is this field's parent a struct?".
-    /// * If we are starting at a [RecordBatch](arrow::record_batch::RecordBatch), this is `false`.
-    /// * If we are calculating a list's child, this is `false`.
-    /// * If we are calculating a struct (i.e. `field.data_type90 == Struct`),
-    /// this depends on whether the struct is a child of a struct.
-    /// * If we are calculating a field inside a [StructArray], this is 'true'.
-    pub(crate) fn calculate_array_levels(
-        &self,
-        array: &ArrayRef,
-        field: &Field,
-    ) -> Vec<Self> {
-        let (array_offsets, array_mask) =
-            Self::get_array_offsets_and_masks(array, self.offset, self.length);
-        match array.data_type() {
-            DataType::Null => vec![Self {
-                definition: self.definition.clone(),
-                repetition: self.repetition.clone(),
-                array_offsets,
-                array_mask,
-                max_definition: self.max_definition.max(1),
-                // Null type is always nullable
-                level_type: LevelType::Primitive(true),
-                offset: self.offset,
-                length: self.length,
-            }],
-            DataType::Boolean
+/// Returns true if the DataType can be represented as a primitive parquet column,
+/// i.e. a leaf array with no children
+fn is_leaf(data_type: &DataType) -> bool {
+    matches!(
+        data_type,
+        DataType::Null
+            | DataType::Boolean
             | DataType::Int8
             | DataType::Int16
             | DataType::Int32
@@ -171,650 +89,391 @@ impl LevelInfo {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal(_, _)
-            | DataType::FixedSizeBinary(_) => {
-                // we return a vector of 1 value to represent the primitive
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            | DataType::FixedSizeBinary(_)
+    )
+}
+
+/// The definition and repetition level of an array within a potentially nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+    /// The current repetition level
+    rep_level: i16,
+    /// The current definition level
+    def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+    /// A primitive, leaf array
+    Primitive(LevelInfo),
+    /// A list array, contains the [`LevelInfoBuilder`] of the child and
+    /// the [`LevelContext`] of this list
+    List(Box<LevelInfoBuilder>, LevelContext),
+    /// A list array, contains the [`LevelInfoBuilder`] of its children and
+    /// the [`LevelContext`] of this struct array
+    Struct(Vec<LevelInfoBuilder>, LevelContext),
+}
+
+impl LevelInfoBuilder {
+    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
+    fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
+        match field.data_type() {
+            d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
+                parent_ctx,
+                field.is_nullable(),
+            ))),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive(
+                LevelInfo::new(parent_ctx, field.is_nullable()),
+            )),
+            DataType::Struct(children) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 1,
+                    false => parent_ctx.def_level,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level,
+                    def_level,
+                };
+
+                let children = children
+                    .iter()
+                    .map(|f| Self::try_new(f, ctx))
+                    .collect::<Result<_>>()?;
+
+                Ok(Self::Struct(children, ctx))
             }
-            DataType::List(list_field) | DataType::LargeList(list_field) => {
-                let child_offset = array_offsets[0] as usize;
-                let child_len = *array_offsets.last().unwrap() as usize;
-                // Calculate the list level
-                let list_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::List(field.is_nullable()),
-                );
-
-                // Construct the child array of the list, and get its offset + mask
-                let array_data = array.data();
-                let child_data = array_data.child_data().get(0).unwrap();
-                let child_array = make_array(child_data.clone());
-                let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
-                    &child_array,
-                    child_offset,
-                    child_len - child_offset,
-                );
-
-                match child_array.data_type() {
-                    DataType::Null
-                    | DataType::Boolean
-                    | DataType::Int8
-                    | DataType::Int16
-                    | DataType::Int32
-                    | DataType::Int64
-                    | DataType::UInt8
-                    | DataType::UInt16
-                    | DataType::UInt32
-                    | DataType::UInt64
-                    | DataType::Float16
-                    | DataType::Float32
-                    | DataType::Float64
-                    | DataType::Timestamp(_, _)
-                    | DataType::Date32
-                    | DataType::Date64
-                    | DataType::Time32(_)
-                    | DataType::Time64(_)
-                    | DataType::Duration(_)
-                    | DataType::Interval(_)
-                    | DataType::Binary
-                    | DataType::LargeBinary
-                    | DataType::Utf8
-                    | DataType::LargeUtf8
-                    | DataType::Dictionary(_, _)
-                    | DataType::Decimal(_, _)
-                    | DataType::FixedSizeBinary(_) => {
-                        vec![list_level.calculate_child_levels(
-                            child_offsets,
-                            child_mask,
-                            LevelType::Primitive(list_field.is_nullable()),
-                        )]
-                    }
-                    DataType::List(_)
-                    | DataType::LargeList(_)
-                    | DataType::Struct(_)
-                    | DataType::Map(_, _) => {
-                        list_level.calculate_array_levels(&child_array, list_field)
-                    }
-                    DataType::FixedSizeList(_, _) => unimplemented!(),
-                    DataType::Union(_, _, _) => unimplemented!(),
-                }
+            DataType::List(child)
+            | DataType::LargeList(child)
+            | DataType::Map(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))
             }
-            DataType::Map(map_field, _) => {
-                // Calculate the map level
-                let map_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    // A map is treated like a list as it has repetition
-                    LevelType::List(field.is_nullable()),
-                );
-
-                let map_array = array.as_any().downcast_ref::<MapArray>().unwrap();
-
-                let key_array = map_array.keys();
-                let value_array = map_array.values();
-
-                if let DataType::Struct(fields) = map_field.data_type() {
-                    let key_field = &fields[0];
-                    let value_field = &fields[1];
-
-                    let mut map_levels = vec![];
-
-                    // Get key levels
-                    let mut key_levels =
-                        map_level.calculate_array_levels(&key_array, key_field);
-                    map_levels.append(&mut key_levels);
-
-                    let mut value_levels =
-                        map_level.calculate_array_levels(&value_array, value_field);
-                    map_levels.append(&mut value_levels);
-
-                    map_levels
-                } else {
-                    panic!(
-                        "Map field should be a struct, found {:?}",
-                        map_field.data_type()
-                    );
-                }
+            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
+        }
+    }
+
+    /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns
+    /// as enumerated by a depth-first search
+    fn finish(self) -> Vec<LevelInfo> {
+        match self {
+            LevelInfoBuilder::Primitive(v) => vec![v],
+            LevelInfoBuilder::List(v, _) => v.finish(),
+            LevelInfoBuilder::Struct(v, _) => {
+                v.into_iter().flat_map(|l| l.finish()).collect()
             }
-            DataType::FixedSizeList(_, _) => unimplemented!(),
-            DataType::Struct(struct_fields) => {
-                let struct_array: &StructArray = array
+        }
+    }
+
+    /// Given an `array`, write the level data for the elements in `range`
+    fn write(&mut self, array: &ArrayRef, range: Range<usize>) {
+        match array.data_type() {
+            d if is_leaf(d) => self.write_leaf(array, range),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+                self.write_leaf(array, range)
+            }
+            DataType::Struct(_) => {
+                let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+                self.write_struct(array, range)
+            }
+            DataType::List(_) => {
+                let array = array
                     .as_any()
-                    .downcast_ref::<StructArray>()
-                    .expect("Unable to get struct array");
-                let mut struct_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Struct(field.is_nullable()),
-                );
-
-                // If the parent field is a list, calculate the children of the struct as if it
-                // were a list as well.
-                if matches!(self.level_type, LevelType::List(_)) {
-                    struct_level.level_type = LevelType::List(false);
-                }
+                    .downcast_ref::<GenericListArray<i32>>()
+                    .unwrap();
+                self.write_list(array.value_offsets(), array.data(), range)
+            }
+            DataType::LargeList(_) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<GenericListArray<i64>>()
+                    .unwrap();
 
-                let mut struct_levels = vec![];
-                struct_array
-                    .columns()
-                    .into_iter()
-                    .zip(struct_fields)
-                    .for_each(|(child_array, child_field)| {
-                        let mut levels =
-                            struct_level.calculate_array_levels(child_array, child_field);
-                        struct_levels.append(&mut levels);
-                    });
-                struct_levels
+                self.write_list(array.value_offsets(), array.data(), range)
             }
-            DataType::Union(_, _, _) => unimplemented!(),
-            DataType::Dictionary(_, _) => {
-                // Need to check for these cases not implemented in C++:
-                // - "Writing DictionaryArray with nested dictionary type not yet supported"
-                // - "Writing DictionaryArray with null encoded in dictionary type not yet supported"
-                // vec![self.get_primitive_def_levels(array, field, array_mask)]
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            DataType::Map(_, _) => {
+                let array = array.as_any().downcast_ref::<MapArray>().unwrap();
+                // A Map is just as ListArray<i32> with a StructArray child, we therefore
+                // treat it as such to avoid code duplication
+                self.write_list(array.value_offsets(), array.data(), range)
             }
+            _ => unreachable!(),
         }
     }
 
-    /// Calculate child/leaf array levels.
-    ///
-    /// The algorithm works by incrementing definitions of array values based on whether:
-    /// - a value is optional or required (is_nullable)
-    /// - a list value is repeated + optional or required (is_list)
-    ///
-    /// A record batch always starts at a populated definition = level 0.
-    /// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
-    /// can only have a maximum level of 1 if it is not null.
-    /// If it is not null, we increment by 1, such that the null slots will = level 1.
-    /// The above applies to types that have no repetition (anything not a list or map).
-    ///
-    /// If a batch has lists, then we increment by up to 2 levels:
-    /// - 1 level for the list (repeated)
-    /// - 1 level if the list itself is nullable (optional)
-    ///
-    /// A list's child then gets incremented using the above rules.
-    ///
-    /// *Exceptions*
-    ///
-    /// There are 2 exceptions from the above rules:
-    ///
-    /// 1. When at the root of the schema: We always increment the
-    /// level regardless of whether the child is nullable or not. If we do not do
-    /// this, we could have a non-nullable array having a definition of 0.
-    ///
-    /// 2. List parent, non-list child: We always increment the level in this case,
-    /// regardless of whether the child is nullable or not.
-    ///
-    /// *Examples*
+    /// Write `range` elements from ListArray `array`
     ///
-    /// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
-    /// * We don't increment the definition level as the array is not optional.
-    /// * This would leave us with a definition of 0, so the first exception applies.
-    /// * The definition level becomes 1.
-    ///
-    /// A batch with only a primitive that's nullable. `<primitive[optional]>`:
-    /// * The definition level becomes 1, as we increment it once.
-    ///
-    /// A batch with a single non-nullable list (both list and child not null):
-    /// * We calculate the level twice, for the list, and for the child.
-    /// * At the list, the level becomes 1, where 0 indicates that the list is
-    ///  empty, and 1 says it's not (determined through offsets).
-    /// * At the primitive level, the second exception applies. The level becomes 2.
-    fn calculate_child_levels(
-        &self,
-        // we use 64-bit offsets to also accommodate large arrays
-        array_offsets: Vec<i64>,
-        array_mask: Vec<bool>,
-        level_type: LevelType,
-    ) -> Self {
-        let min_len = *(array_offsets.last().unwrap()) as usize;
-        let mut definition = Vec::with_capacity(min_len);
-        let mut repetition = Vec::with_capacity(min_len);
-        let mut merged_array_mask = Vec::with_capacity(min_len);
-
-        let max_definition = match (self.level_type, level_type) {
-            // Handle the illegal cases
-            (_, LevelType::Root) => {
-                unreachable!("Cannot have a root as a child")
-            }
-            (LevelType::Primitive(_), _) => {
-                unreachable!("Cannot have a primitive parent for any type")
-            }
-            // The general case
-            (_, _) => self.max_definition + level_type.level_increment(),
+    /// Note: MapArrays are ListArray<i32> under the hood and so are dispatched to this method
+    fn write_list<O: OffsetSizeTrait>(
+        &mut self,
+        offsets: &[O],
+        list_data: &ArrayData,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let offsets = &offsets[range.start..range.end + 1];
+        let child_array = make_array(list_data.child_data()[0].clone());
+
+        let write_non_null_slice =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                child.write(&child_array, start_idx..end_idx);
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    let mut rev = rep_levels.iter_mut().rev();
+                    let mut remaining = end_idx - start_idx;
+
+                    loop {
+                        let next = rev.next().unwrap();
+                        if *next > ctx.rep_level {
+                            // Nested element - ignore
+                            continue;
+                        }
+
+                        remaining -= 1;
+                        if remaining == 0 {
+                            *next = ctx.rep_level - 1;
+                            break;
+                        }
+                    }
+                })
+            };
+
+        let write_empty_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 1);
+            })
         };
 
-        match (self.level_type, level_type) {
-            (LevelType::List(_), LevelType::List(is_nullable)) => {
-                // Parent is a list or descendant of a list, and child is a list
-                let reps = self.repetition.clone().unwrap();
-
-                // List is null, and not empty
-                let l1 = max_definition - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                let mut nulls_seen = 0;
-
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        // If the parent length is 0, there won't be a slot for the child
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(0);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+        let write_null_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 2);
+            })
+        };
+
+        match list_data.null_bitmap() {
+            Some(nulls) => {
+                let null_offset = list_data.offset() + range.start;
+                // TODO: Faster bitmask iteration (#1757)
+                for (idx, w) in offsets.windows(2).enumerate() {
+                    let is_valid = nulls.is_set(idx + null_offset);
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if !is_valid {
+                        write_null_slice(child)
+                    } else if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        (start..end).for_each(|parent_index| {
-                            let index = parent_index + nulls_seen - self.offset;
-                            let parent_index = parent_index - self.offset;
-
-                            // parent is either defined at this level, or earlier
-                            let parent_def = self.definition[index];
-                            let parent_rep = reps[index];
-                            let parent_mask = self.array_mask[index];
-
-                            // valid parent, index into children
-                            let child_start = array_offsets[parent_index] as usize;
-                            let child_end = array_offsets[parent_index + 1] as usize;
-                            let child_len = child_end - child_start;
-                            let child_mask = array_mask[parent_index];
-                            let merged_mask = parent_mask && child_mask;
-
-                            if child_len == 0 {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(parent_rep);
-                                merged_array_mask.push(merged_mask);
-                            } else {
-                                (child_start..child_end).for_each(|child_index| {
-                                    let rep = match (
-                                        parent_index == start,
-                                        child_index == child_start,
-                                    ) {
-                                        (true, true) => parent_rep,
-                                        (true, false) => parent_rep + 2,
-                                        (false, true) => parent_rep,
-                                        (false, false) => parent_rep + 1,
-                                    };
-
-                                    definition.push(if !parent_mask {
-                                        parent_def
-                                    } else if child_mask {
-                                        l3
-                                    } else {
-                                        l1
-                                    });
-                                    repetition.push(rep);
-                                    merged_array_mask.push(merged_mask);
-                                });
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (LevelType::List(_), _) => {
-                // List and primitive (or struct).
-                // The list can have more values than the primitive, indicating that there
-                // are slots where the list is empty. We use a counter to track this behaviour.
-                let mut nulls_seen = 0;
-
-                // let child_max_definition = list_max_definition + is_nullable as i16;
-                // child values are a function of parent list offsets
-                let reps = self.repetition.as_deref().unwrap();
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(reps[index]);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+            None => {
+                for w in offsets.windows(2) {
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        // iterate through the array, adjusting child definitions for nulls
-                        (start..end).for_each(|child_index| {
-                            let index = child_index + nulls_seen - self.offset;
-                            let child_mask = array_mask[child_index - self.offset];
-                            let parent_mask = self.array_mask[index];
-                            let parent_def = self.definition[index];
-
-                            if !parent_mask || parent_def < self.max_definition {
-                                definition.push(parent_def);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(parent_mask);
-                            } else {
-                                definition.push(max_definition - !child_mask as i16);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(child_mask);
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets: self.array_offsets.clone(),
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (_, LevelType::List(is_nullable)) => {
-                // Encountering a list for the first time.
-                // Calculate the 2 list hierarchy definitions in advance
-
-                // List is null, and not empty
-                let l1 = max_definition - 1 - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                self.definition
-                    .iter()
-                    .enumerate()
-                    .for_each(|(parent_index, def)| {
-                        let child_from = array_offsets[parent_index];
-                        let child_to = array_offsets[parent_index + 1];
-                        let child_len = child_to - child_from;
-                        let child_mask = array_mask[parent_index];
-                        let parent_mask = self.array_mask[parent_index];
-
-                        match (parent_mask, child_len) {
-                            (true, 0) => {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 });
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (false, 0) => {
-                                // Inherit the parent definition as parent was null
-                                definition.push(*def);
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (true, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // l1 and l3 make sense as list is not empty,
-                                    // but we reflect that it's either null or not
-                                    definition.push(if child_mask { l3 } else { l1 });
-                                    // Mark the first child slot as 0, and the next as 1
-                                    repetition.push(if child_index == child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(child_mask);
-                                });
+        }
+    }
+
+    /// Write `range` elements from StructArray `array`
+    fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
+        let (children, ctx) = match self {
+            Self::Struct(children, ctx) => (children, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+            for child in children {
+                child.visit_leaves(|info| {
+                    let len = range.end - range.start;
+
+                    let def_levels = info.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+
+                    if let Some(rep_levels) = info.rep_levels.as_mut() {
+                        rep_levels.extend(std::iter::repeat(ctx.rep_level).take(len));
+                    }
+                })
+            }
+        };
+
+        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+            for (child_array, child) in array.columns().into_iter().zip(children) {
+                child.write(child_array, range.clone())
+            }
+        };
+
+        match array.data().null_bitmap() {
+            Some(validity) => {
+                let null_offset = array.data().offset();
+                let mut last_non_null_idx = None;
+                let mut last_null_idx = None;
+
+                // TODO: Faster bitmask iteration (#1757)
+                for i in range.clone() {
+                    match validity.is_set(i + null_offset) {
+                        true => {
+                            if let Some(last_idx) = last_null_idx.take() {
+                                write_null(children, last_idx..i)
                             }
-                            (false, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // Inherit the parent definition as parent was null
-                                    definition.push(*def);
-                                    // mark the first child slot as 0, and the next as 1
-                                    repetition.push(if child_index == child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(false);
-                                });
+                            last_non_null_idx.get_or_insert(i);
+                        }
+                        false => {
+                            if let Some(last_idx) = last_non_null_idx.take() {
+                                write_non_null(children, last_idx..i)
                             }
+                            last_null_idx.get_or_insert(i);
                         }
-                    });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset,
-                    length,
+                    }
+                }
+
+                if let Some(last_idx) = last_null_idx.take() {
+                    write_null(children, last_idx..range.end)
+                }
+
+                if let Some(last_idx) = last_non_null_idx.take() {
+                    write_non_null(children, last_idx..range.end)
                 }
             }
-            (_, _) => {
-                self.definition
-                    .iter()
-                    .zip(array_mask.into_iter().zip(&self.array_mask))
-                    .for_each(|(current_def, (child_mask, parent_mask))| {
-                        merged_array_mask.push(*parent_mask && child_mask);
-                        match (parent_mask, child_mask) {
-                            (true, true) => {
-                                definition.push(max_definition);
-                            }
-                            (true, false) => {
-                                // The child is only legally null if its array is nullable.
-                                // Thus parent's max_definition is lower
-                                definition.push(if *current_def <= self.max_definition {
-                                    *current_def
-                                } else {
-                                    self.max_definition
-                                });
-                            }
-                            // if the parent was false, retain its definitions
-                            (false, _) => {
-                                definition.push(*current_def);
+            None => write_non_null(children, range),
+        }
+    }
+
+    /// Write a primitive array, as defined by [`is_leaf`]
+    fn write_leaf(&mut self, array: &ArrayRef, range: Range<usize>) {
+        let info = match self {
+            Self::Primitive(info) => info,
+            _ => unreachable!(),
+        };
+
+        let len = range.end - range.start;
+
+        match &mut info.def_levels {
+            Some(def_levels) => {
+                def_levels.reserve(len);
+                info.non_null_indices.reserve(len);
+
+                match array.data().null_bitmap() {
+                    Some(nulls) => {
+                        let nulls_offset = array.data().offset();
+                        // TODO: Faster bitmask iteration (#1757)
+                        for i in range {
+                            match nulls.is_set(i + nulls_offset) {
+                                true => {
+                                    def_levels.push(info.max_def_level);
+                                    info.non_null_indices.push(i)
+                                }
+                                false => def_levels.push(info.max_def_level - 1),
                             }
                         }
-                    });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                Self {
-                    definition,
-                    repetition: self.repetition.clone(), // it's None
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    // Inherit parent offset and length
-                    offset: self.offset,
-                    length: self.length,
+                    }
+                    None => {
+                        let iter = std::iter::repeat(info.max_def_level).take(len);
+                        def_levels.extend(iter);
+                        info.non_null_indices.extend(range);
+                    }
                 }
             }
+            None => info.non_null_indices.extend(range),
+        }
+
+        if let Some(rep_levels) = &mut info.rep_levels {
+            rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len))
         }
     }
 
-    /// Get the offsets of an array as 64-bit values, and validity masks as booleans
-    /// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
-    ///   from validity bitmap
-    /// - List array offsets will be the value offsets, masks are computed from offsets
-    fn get_array_offsets_and_masks(
-        array: &ArrayRef,
-        offset: usize,
-        len: usize,
-    ) -> (Vec<i64>, Vec<bool>) {
-        match array.data_type() {
-            // A NullArray is entirely nulls, despite not containing a null buffer
-            DataType::Null => ((0..=(len as i64)).collect(), vec![false; len]),
-            DataType::Boolean
-            | DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Float16
-            | DataType::Float32
-            | DataType::Float64
-            | DataType::Timestamp(_, _)
-            | DataType::Date32
-            | DataType::Date64
-            | DataType::Time32(_)
-            | DataType::Time64(_)
-            | DataType::Duration(_)
-            | DataType::Interval(_)
-            | DataType::Binary
-            | DataType::LargeBinary
-            | DataType::Utf8
-            | DataType::LargeUtf8
-            | DataType::Struct(_)
-            | DataType::Dictionary(_, _)
-            | DataType::Decimal(_, _) => {
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
-                    None => vec![true; len],
-                };
-                ((0..=(len as i64)).collect(), array_mask)
-            }
-            DataType::List(_) | DataType::Map(_, _) => {
-                let offsets = unsafe { array.data().buffers()[0].typed_data::<i32>() };
-                let offsets = offsets
-                    .iter()
-                    .copied()
-                    .skip(array.offset() + offset)
-                    .take(len + 1)
-                    .map(|v| v as i64)
-                    .collect::<Vec<i64>>();
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
-                    None => vec![true; len],
-                };
-                (offsets, array_mask)
-            }
-            DataType::LargeList(_) => {
-                let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
-                    .iter()
-                    .skip(array.offset() + offset)
-                    .take(len + 1)
-                    .copied()
-                    .collect();
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
-                    None => vec![true; len],
-                };
-                (offsets, array_mask)
-            }
-            DataType::FixedSizeBinary(value_len) => {
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
-                    None => vec![true; len],
-                };
-                let value_len = *value_len as i64;
-                (
-                    (0..=(len as i64)).map(|v| v * value_len).collect(),
-                    array_mask,
-                )
-            }
-            DataType::FixedSizeList(_, _) | DataType::Union(_, _, _) => {
-                unimplemented!("Getting offsets not yet implemented")
+    /// Visits all children of this node in depth first order
+    fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) {
+        match self {
+            LevelInfoBuilder::Primitive(info) => visit(info),
+            LevelInfoBuilder::List(c, _) => c.visit_leaves(visit),
+            LevelInfoBuilder::Struct(children, _) => {
+                for c in children {
+                    c.visit_leaves(visit)
+                }
             }
         }
     }
+}
+/// The data necessary to write a primitive Arrow array to parquet, taking into account
+/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+    /// Array's definition levels
+    ///
+    /// Present if `max_def_level != 0`
+    def_levels: Option<Vec<i16>>,
 
-    /// Given a level's information, calculate the offsets required to index an array correctly.
-    pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
-        if !matches!(self.level_type, LevelType::Primitive(_)) {
-            panic!(
-                "Cannot filter indices on a non-primitive array, found {:?}",
-                self.level_type
-            );
-        }
+    /// Array's optional repetition levels
+    ///
+    /// Present if `max_rep_level != 0`
+    rep_levels: Option<Vec<i16>>,
 
-        // happy path if not dealing with lists
-        if self.repetition.is_none() {
-            return self
-                .definition
-                .iter()
-                .enumerate()
-                .filter_map(|(i, def)| {
-                    if *def == self.max_definition {
-                        Some(i)
-                    } else {
-                        None
-                    }
-                })
-                .collect();
-        }
+    /// The corresponding array identifying non-null slices of data
+    /// from the primitive array
+    non_null_indices: Vec<usize>,
 
-        let mut filtered = vec![];
-        let mut definition_levels = self.definition.iter();
-        let mut index = 0;
-
-        for len in self.array_offsets.windows(2).map(|s| s[1] - s[0]) {
-            if len == 0 {
-                // Skip this definition level--the iterator should not be empty, and the definition
-                // level be less than max_definition, i.e., a null value)
-                assert!(*definition_levels.next().unwrap() < self.max_definition);
-            } else {
-                for (_, def) in (0..len).zip(&mut definition_levels) {
-                    if *def == self.max_definition {
-                        filtered.push(index);
-                    }
-                    index += 1;
-                }
-            }
+    /// The maximum definition level for this leaf column
+    max_def_level: i16,
+
+    /// The maximum repetition for this leaf column
+    max_rep_level: i16,
+}
+
+impl LevelInfo {
+    fn new(ctx: LevelContext, is_nullable: bool) -> Self {
+        let max_rep_level = ctx.rep_level;
+        let max_def_level = match is_nullable {
+            true => ctx.def_level + 1,
+            false => ctx.def_level,
+        };
+
+        Self {
+            def_levels: (max_def_level != 0).then(Vec::new),
+            rep_levels: (max_rep_level != 0).then(Vec::new),
+            non_null_indices: vec![],
+            max_def_level,
+            max_rep_level,
         }
+    }
 
-        filtered
+    pub fn def_levels(&self) -> Option<&[i16]> {
+        self.def_levels.as_deref()
     }
-}
 
-/// Convert an Arrow buffer to a boolean array slice
-/// TODO: this was created for buffers, so might not work for bool array, might be slow too
-#[inline]
-fn get_bool_array_slice(
-    buffer: &arrow::buffer::Buffer,
-    offset: usize,
-    len: usize,
-) -> Vec<bool> {
-    let data = buffer.as_slice();
-    (offset..(len + offset))
-        .map(|i| arrow::util::bit_util::get_bit(data, i))
-        .collect()
+    pub fn rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels.as_deref()
+    }
+
+    pub fn non_null_indices(&self) -> &[usize] {
+        &self.non_null_indices
+    }
 }
 
 #[cfg(test)]
@@ -825,203 +484,161 @@ mod tests {
 
     use arrow::array::*;
     use arrow::buffer::Buffer;
-    use arrow::datatypes::{Schema, ToByteSlice};
+    use arrow::datatypes::{Int32Type, Schema, ToByteSlice};
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_columns;
 
     #[test]
     fn test_calculate_array_levels_twitter_example() {
         // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
         // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
-        let parent_levels = LevelInfo {
-            definition: vec![0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
-            array_mask: vec![true, true], // both lists defined
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 2,
-        };
-        // offset into array, each level1 has 2 values
-        let array_offsets = vec![0, 2, 4];
-        let array_mask = vec![true, true];
-
-        // calculate level1 levels
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        //
-        let expected_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1],
-            repetition: Some(vec![0, 1, 0, 1]),
-            array_offsets,
-            array_mask: vec![true, true, true, true],
-            max_definition: 1,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 4,
-        };
-        // the separate asserts make it easier to see what's failing
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        // this assert is to help if there are more variables added to the struct
-        assert_eq!(&levels, &expected_levels);
-
-        // level2
-        let parent_levels = levels;
-        let array_offsets = vec![0, 3, 7, 8, 10];
-        let array_mask = vec![true, true, true, true];
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(false),
-        );
-        let expected_levels = LevelInfo {
-            definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
-            array_offsets,
-            array_mask: vec![true; 10],
-            max_definition: 2,
-            level_type: LevelType::List(false),
-            offset: 0,
-            length: 10,
+
+        let leaf_type = Field::new("item", DataType::Int32, false);
+        let inner_type = DataType::List(Box::new(leaf_type));
+        let inner_field = Field::new("l2", inner_type.clone(), false);
+        let outer_type = DataType::List(Box::new(inner_field));
+        let outer_field = Field::new("l1", outer_type.clone(), false);
+
+        let primitives = Int32Array::from_iter(0..10);
+
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
+        let inner_list = ArrayDataBuilder::new(inner_type)
+            .len(4)
+            .add_buffer(offsets)
+            .add_child_data(primitives.data().clone())
+            .build()
+            .unwrap();
+
+        let offsets = Buffer::from_iter([0_i32, 2, 4]);
+        let outer_list = ArrayDataBuilder::new(outer_type)
+            .len(2)
+            .add_buffer(offsets)
+            .add_child_data(inner_list)
+            .build()
+            .unwrap();
+        let outer_list = make_array(outer_list);
+
+        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected = LevelInfo {
+            def_levels: Some(vec![2; 10]),
+            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
+            max_def_level: 2,
+            max_rep_level: 2,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected);
     }
 
     #[test]
     fn test_calculate_one_level_1() {
         // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets: (0..=10).collect(),
-            array_mask: vec![true; 10],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 10,
-        };
-        let array_offsets: Vec<i64> = (0..=10).collect();
-        let array_mask = vec![true; 10];
+        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, false);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(false),
-        );
         let expected_levels = LevelInfo {
-            // As it is non-null, definitions can be omitted
-            definition: vec![0; 10],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 0,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 10,
+            def_levels: None,
+            rep_levels: None,
+            non_null_indices: (0..10).collect(),
+            max_def_level: 0,
+            max_rep_level: 0,
         };
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_one_level_2() {
-        // This test calculates the levels for a non-null primitive array
-        let parent_levels = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: (0..=5).collect(),
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
-        };
-        let array_offsets: Vec<i64> = (0..=5).collect();
-        let array_mask = vec![true, false, true, true, false];
+        // This test calculates the levels for a nullable primitive array
+        let array = Arc::new(Int32Array::from_iter([
+            Some(0),
+            None,
+            Some(0),
+            Some(0),
+            None,
+        ])) as ArrayRef;
+        let field = Field::new("item", DataType::Int32, true);
+
+        let levels = calculate_array_levels(&array, &field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask.clone(),
-            LevelType::Primitive(true),
-        );
         let expected_levels = LevelInfo {
-            definition: vec![1, 0, 1, 1, 0],
-            repetition: None,
-            array_offsets,
-            array_mask,
-            max_definition: 1,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![1, 0, 1, 1, 0]),
+            rep_levels: None,
+            non_null_indices: vec![0, 2, 3],
+            max_def_level: 1,
+            max_rep_level: 0,
         };
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_array_levels_1() {
+        let leaf_field = Field::new("item", DataType::Int32, false);
+        let list_type = DataType::List(Box::new(leaf_field));
+
         // if all array values are defined (e.g. batch<list<_>>)
         // [[0], [1], [2], [3], [4]]
-        let parent_levels = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
+
+        let leaf_array = Int32Array::from_iter(0..5);
+        // Cannot use from_iter_primitive as always infers nullable
+        let offsets = Buffer::from_iter(0_i32..6);
+        let list = ArrayDataBuilder::new(list_type.clone())
+            .len(5)
+            .add_buffer(offsets)
+            .add_child_data(leaf_array.data().clone())
+            .build()
+            .unwrap();
+        let list = make_array(list);
+
+        let list_field = Field::new("list", list_type.clone(), false);
+        let levels = calculate_array_levels(&list, &list_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected_levels = LevelInfo {
+            def_levels: Some(vec![1; 5]),
+            rep_levels: Some(vec![0; 5]),
+            non_null_indices: (0..5).collect(),
+            max_def_level: 1,
+            max_rep_level: 1,
         };
-        let array_offsets = vec![0, 2, 2, 4, 8, 11];
-        let array_mask = vec![true, false, true, true, true];
+        assert_eq!(&levels[0], &expected_levels);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
-        // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
+        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
         // all values are defined as we do not have nulls on the root (batch)
         // repetition:
         //   0: 0, 1
-        //   1:
+        //   1: 0
         //   2: 0, 1
         //   3: 0, 1, 1, 1
         //   4: 0, 1, 1
+        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
+        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
+        let list = ArrayDataBuilder::new(list_type.clone())
+            .len(5)
+            .add_buffer(offsets)
+            .add_child_data(leaf_array.data().clone())
+            .null_bit_buffer(Some(Buffer::from([0b00011101])))
+            .build()
+            .unwrap();
+        let list = make_array(list);
+
+        let list_field = Field::new("list", list_type, true);
+        let levels = calculate_array_levels(&list, &list_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
         let expected_levels = LevelInfo {
-            // The levels are normally 2 because we:
-            // - Calculate the level at the list
-            // - Calculate the level at the list's child
-            // We do not do this in these tests, thus the levels are 1 less.
-            definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
-            repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
-            array_offsets,
-            array_mask: vec![
-                true, true, false, true, true, true, true, true, true, true, true, true,
-            ],
-            max_definition: 2,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 11, // the child has 11 slots
+            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
+            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
+            non_null_indices: (0..11).collect(),
+            max_def_level: 2,
+            max_rep_level: 1,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
@@ -1039,210 +656,200 @@ mod tests {
         // If the first values of a list are null due to a parent, we have to still account for them
         // while indexing, because they would affect the way the child is indexed
         // i.e. in the above example, we have to know that [0, 1] has to be skipped
-        let parent_levels = LevelInfo {
-            definition: vec![0, 1, 0, 1, 1],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![false, true, false, true, true],
-            max_definition: 1,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 5,
-        };
-        let array_offsets = vec![0, 2, 2, 4, 8, 11];
-        let array_mask = vec![true, false, true, true, true];
+        let leaf = Int32Array::from_iter(0..11);
+        let leaf_field = Field::new("leaf", DataType::Int32, false);
+
+        let list_type = DataType::List(Box::new(leaf_field));
+        let list = ArrayData::builder(list_type.clone())
+            .len(5)
+            .add_child_data(leaf.data().clone())
+            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
+            .build()
+            .unwrap();
+
+        let list = make_array(list);
+        let list_field = Field::new("list", list_type, true);
+
+        let struct_array =
+            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
+        let array = Arc::new(struct_array) as ArrayRef;
+
+        let struct_field = Field::new("struct", array.data_type().clone(), true);
+
+        let levels = calculate_array_levels(&array, &struct_field).unwrap();
+        assert_eq!(levels.len(), 1);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
         let expected_levels = LevelInfo {
-            // 0 1 [2] are 0 (not defined at level 1)
-            // [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
-            // 2 3 [4] are 0
-            // 4 5 6 7 [8] are 1 (defined at level 1 only)
-            // 8 9 10 [11] are 2 (defined at both levels)
-            definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
-            repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
-            array_offsets,
-            array_mask: vec![
-                false, false, false, false, false, true, true, true, true, true, true,
-                true,
-            ],
-            max_definition: 3,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 11,
+            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
+            non_null_indices: (4..11).collect(),
+            max_def_level: 3,
+            max_rep_level: 1,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
-
-        // nested lists (using previous test)
-        let nested_parent_levels = levels;
-        let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22];
-        let array_mask = vec![
-            true, true, true, true, true, true, true, true, true, true, true,
-        ];
-        let levels = nested_parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
+
+        assert_eq!(&levels[0], &expected_levels);
+
+        // nested lists
+
+        // 0: [[100, 101], [102, 103]]
+        // 1: []
+        // 2: [[104, 105], [106, 107]]
+        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
+        // 4: [[116, 117], [118, 119], [120, 121]]
+
+        let leaf = Int32Array::from_iter(100..122);
+        let leaf_field = Field::new("leaf", DataType::Int32, true);
+
+        let l1_type = DataType::List(Box::new(leaf_field));
+        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
+        let l1 = ArrayData::builder(l1_type.clone())
+            .len(11)
+            .add_child_data(leaf.data().clone())
+            .add_buffer(offsets)
+            .build()
+            .unwrap();
+
+        let l1_field = Field::new("l1", l1_type, true);
+        let l2_type = DataType::List(Box::new(l1_field));
+        let l2 = ArrayData::builder(l2_type)
+            .len(5)
+            .add_child_data(l1)
+            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
+            .build()
+            .unwrap();
+
+        let l2 = make_array(l2);
+        let l2_field = Field::new("l2", l2.data_type().clone(), true);
+
+        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
         let expected_levels = LevelInfo {
-            // (def: 0) 0 1 [2] are 0 (take parent)
-            // (def: 0) 2 3 [4] are 0 (take parent)
-            // (def: 0) 4 5 [6] are 0 (take parent)
-            // (def: 0) 6 7 [8] are 0 (take parent)
-            // (def: 1) 8 9 [10] are 1 (take parent)
-            // (def: 1) 10 11 [12] are 1 (take parent)
-            // (def: 1) 12 23 [14] are 1 (take parent)
-            // (def: 1) 14 15 [16] are 1 (take parent)
-            // (def: 2) 16 17 [18] are 2 (defined at all levels)
-            // (def: 2) 18 19 [20] are 2 (defined at all levels)
-            // (def: 2) 20 21 [22] are 2 (defined at all levels)
-            //
-            // 0 1 [2] are 0 (not defined at level 1)
-            // [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
-            // 2 3 [4] are 0
-            // 4 5 6 7 [8] are 1 (defined at level 1 only)
-            // 8 9 10 [11] are 2 (defined at both levels)
-            //
-            // 0: [[100, 101], [102, 103]]
-            // 1: []
-            // 2: [[104, 105], [106, 107]]
-            // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
-            // 4: [[116, 117], [118, 119], [120, 121]]
-            definition: vec![
-                0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
-            ],
-            repetition: Some(vec![
+            def_levels: Some(vec![
+                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
+            ]),
+            rep_levels: Some(vec![
                 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
             ]),
-            array_offsets,
-            array_mask: vec![
-                false, false, false, false, false, false, false, false, false, true,
-                true, true, true, true, true, true, true, true, true, true, true, true,
-                true,
-            ],
-            max_definition: 5,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 22,
+            non_null_indices: (0..22).collect(),
+            max_def_level: 5,
+            max_rep_level: 2,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
     fn test_calculate_array_levels_nested_list() {
+        let leaf_field = Field::new("leaf", DataType::Int32, false);
+        let list_type = DataType::List(Box::new(leaf_field));
+
         // if all array values are defined (e.g. batch<list<_>>)
         // The array at this level looks like:
         // 0: [a]
         // 1: [a]
         // 2: [a]
         // 3: [a]
-        let parent_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4],
-            array_mask: vec![true, true, true, true],
-            max_definition: 1,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 4,
+
+        let leaf = Int32Array::from_iter([0; 4]);
+        let list = ArrayData::builder(list_type.clone())
+            .len(4)
+            .add_buffer(Buffer::from_iter(0_i32..5))
+            .add_child_data(leaf.data().clone())
+            .build()
+            .unwrap();
+        let list = make_array(list);
+
+        let list_field = Field::new("list", list_type.clone(), false);
+        let levels = calculate_array_levels(&list, &list_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected_levels = LevelInfo {
+            def_levels: Some(vec![1; 4]),
+            rep_levels: Some(vec![0; 4]),
+            non_null_indices: (0..4).collect(),
+            max_def_level: 1,
+            max_rep_level: 1,
         };
-        // 0: null ([], but mask is false, so it's not just an empty list)
-        // 1: [1, 2, 3]
-        // 2: [4, 5]
-        // 3: [6, 7]
-        let array_offsets = vec![0, 1, 4, 6, 8];
-        let array_mask = vec![false, true, true, true];
+        assert_eq!(&levels[0], &expected_levels);
 
-        let levels = parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
-        // 0: [null], level 1 is defined, but not 2
+        // 0: null
         // 1: [1, 2, 3]
         // 2: [4, 5]
         // 3: [6, 7]
+        let leaf = Int32Array::from_iter(0..8);
+        let list = ArrayData::builder(list_type.clone())
+            .len(4)
+            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
+            .null_bit_buffer(Some(Buffer::from([0b00001110])))
+            .add_child_data(leaf.data().clone())
+            .build()
+            .unwrap();
+        let list = make_array(list);
+        let list_field = Field::new("list", list_type, true);
+
+        let struct_array = StructArray::from(vec![(list_field, list)]);
+        let array = Arc::new(struct_array) as ArrayRef;
+
+        let struct_field = Field::new("struct", array.data_type().clone(), true);
+        let levels = calculate_array_levels(&array, &struct_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
         let expected_levels = LevelInfo {
-            definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
-            repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
-            array_offsets,
-            array_mask: vec![false, true, true, true, true, true, true, true],
-            max_definition: 3,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 8,
+            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
+            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
+            non_null_indices: (0..7).collect(),
+            max_def_level: 3,
+            max_rep_level: 1,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
-
-        // nested lists (using previous test)
-        let nested_parent_levels = levels;
-        // 0: [null] (was a populated null slot at the parent)
-        // 1: [201]
-        // 2: [202, 203]
-        // 3: null ([])
-        // 4: [204, 205, 206]
-        // 5: [207, 208, 209, 210]
-        // 6: [] (tests a non-null empty list slot)
-        // 7: [211, 212, 213, 214, 215]
-        let array_offsets = vec![0, 1, 2, 4, 4, 7, 11, 11, 16];
-        // logically, the fist slot of the mask is false
-        let array_mask = vec![true, true, true, false, true, true, true, true];
-        let levels = nested_parent_levels.calculate_child_levels(
-            array_offsets.clone(),
-            array_mask,
-            LevelType::List(true),
-        );
-        // We have 7 array values, and at least 15 primitives (from array_offsets)
-        // 0: (-)[null], parent was null, no value populated here
-        // 1: (0)[201], (1)[202, 203], (2)[[null]]
-        // 2: (3)[204, 205, 206], (4)[207, 208, 209, 210]
-        // 3: (5)[[]], (6)[211, 212, 213, 214, 215]
-        //
+        assert_eq!(&levels[0], &expected_levels);
+
+        // nested lists
         // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
-        // 0: {"struct": [ null ]}
+        // 0: {"struct": null }
         // 1: {"struct": [ [201], [202, 203], [] ]}
         // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
         // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
+
+        let leaf = Int32Array::from_iter(201..216);
+        let leaf_field = Field::new("leaf", DataType::Int32, false);
+        let list_1_type = DataType::List(Box::new(leaf_field));
+        let list_1 = ArrayData::builder(list_1_type.clone())
+            .len(7)
+            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
+            .add_child_data(leaf.data().clone())
+            .build()
+            .unwrap();
+
+        let list_1_field = Field::new("l1", list_1_type, true);
+        let list_2_type = DataType::List(Box::new(list_1_field));
+        let list_2 = ArrayData::builder(list_2_type.clone())
+            .len(4)
+            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
+            .null_bit_buffer(Some(Buffer::from([0b00001110])))
+            .add_child_data(list_1)
+            .build()
+            .unwrap();
+
+        let list_2 = make_array(list_2);
+        let list_2_field = Field::new("list_2", list_2_type, true);
+
+        let struct_array =
+            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
+        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
+
+        let array = Arc::new(struct_array) as ArrayRef;
+        let levels = calculate_array_levels(&array, &struct_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
         let expected_levels = LevelInfo {
-            definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
-            repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
-            array_mask: vec![
-                false, true, true, true, false, true, true, true, true, true, true, true,
-                true, true, true, true, true, true,
-            ],
-            array_offsets,
-            max_definition: 5,
-            level_type: LevelType::List(true),
-            offset: 0,
-            length: 16,
+            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
+            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
+            non_null_indices: (0..15).collect(),
+            max_def_level: 5,
+            max_rep_level: 2,
         };
-        assert_eq!(&levels.definition, &expected_levels.definition);
-        assert_eq!(&levels.repetition, &expected_levels.repetition);
-        assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.array_mask, &expected_levels.array_mask);
-        assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.level_type, &expected_levels.level_type);
-        assert_eq!(&levels, &expected_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
@@ -1255,54 +862,34 @@ mod tests {
         //  - {a: {b: null}}
         //  - {a: null}}
         //  - {a: {b: {c: 6}}}
-        let a_levels = LevelInfo {
-            definition: vec![1, 1, 1, 1, 0, 1],
-            repetition: None,
-            array_offsets: (0..=6).collect(),
-            array_mask: vec![true, true, true, true, false, true],
-            max_definition: 1,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 6,
-        };
-        // b's offset and mask
-        let b_offsets: Vec<i64> = (0..=6).collect();
-        let b_mask = vec![true, true, true, false, false, true];
-        // b's expected levels
-        let b_expected_levels = LevelInfo {
-            definition: vec![2, 2, 2, 1, 0, 2],
-            repetition: None,
-            array_offsets: (0..=6).collect(),
-            array_mask: vec![true, true, true, false, false, true],
-            max_definition: 2,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 6,
-        };
-        let b_levels = a_levels.calculate_child_levels(
-            b_offsets.clone(),
-            b_mask,
-            LevelType::Struct(true),
-        );
-        assert_eq!(&b_expected_levels, &b_levels);
-
-        // c's offset and mask
-        let c_offsets = b_offsets;
-        let c_mask = vec![true, false, true, false, false, true];
-        // c's expected levels
-        let c_expected_levels = LevelInfo {
-            definition: vec![3, 2, 3, 1, 0, 3],
-            repetition: None,
-            array_offsets: c_offsets.clone(),
-            array_mask: vec![true, false, true, false, false, true],
-            max_definition: 3,
-            level_type: LevelType::Struct(true),
-            offset: 0,
-            length: 6,
+
+        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
+        let c_field = Field::new("c", DataType::Int32, true);
+        let b = StructArray::from((
+            (vec![(c_field, Arc::new(c) as ArrayRef)]),
+            Buffer::from([0b00110111]),
+        ));
+
+        let b_field = Field::new("b", b.data_type().clone(), true);
+        let a = StructArray::from((
+            (vec![(b_field, Arc::new(b) as ArrayRef)]),
+            Buffer::from([0b00101111]),
+        ));
+
+        let a_field = Field::new("a", a.data_type().clone(), true);
+        let a_array = Arc::new(a) as ArrayRef;
+
+        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
+        assert_eq!(levels.len(), 1);
+
+        let expected_levels = LevelInfo {
+            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
+            rep_levels: None,
+            non_null_indices: vec![0, 2, 5],
+            max_def_level: 3,
+            max_rep_level: 0,
         };
-        let c_levels =
-            b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
-        assert_eq!(&c_expected_levels, &c_levels);
+        assert_eq!(&levels[0], &expected_levels);
     }
 
     #[test]
@@ -1310,8 +897,7 @@ mod tests {
         // this tests the level generation from the arrow_writer equivalent test
 
         let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
-        let a_value_offsets =
-            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
         let a_list_type =
             DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
         let a_list_data = ArrayData::builder(a_list_type.clone())
@@ -1325,56 +911,25 @@ mod tests {
         assert_eq!(a_list_data.null_count(), 1);
 
         let a = ListArray::from(a_list_data);
-        let values = Arc::new(a);
+        let values = Arc::new(a) as _;
 
-        let schema = Schema::new(vec![Field::new("item", a_list_type, true)]);
+        let item_field = Field::new("item", a_list_type, true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&values, 2..4);
+        let levels = builder.finish();
 
-        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
-
-        let expected_batch_level = LevelInfo {
-            definition: vec![0; 2],
-            repetition: None,
-            array_offsets: (0..=2).collect(),
-            array_mask: vec![true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 2,
-            length: 2,
-        };
-
-        let batch_level = LevelInfo::new(2, 2);
-        assert_eq!(&batch_level, &expected_batch_level);
-
-        // calculate the list's level
-        let mut levels = vec![];
-        batch
-            .columns()
-            .iter()
-            .zip(batch.schema().fields())
-            .for_each(|(array, field)| {
-                let mut array_levels = batch_level.calculate_array_levels(array, field);
-                levels.append(&mut array_levels);
-            });
         assert_eq!(levels.len(), 1);
 
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![0, 3, 3, 3],
-            repetition: Some(vec![0, 0, 1, 1]),
-            array_offsets: vec![3, 3, 6],
-            array_mask: vec![false, true, true, true],
-            max_definition: 3,
-            level_type: LevelType::Primitive(true),
-            offset: 3,
-            length: 3,
+            def_levels: Some(vec![0, 3, 3, 3]),
+            rep_levels: Some(vec![0, 0, 1, 1]),
+            non_null_indices: vec![3, 4, 5],
+            max_def_level: 3,
+            max_rep_level: 1,
         };
-        assert_eq!(&list_level.definition, &expected_level.definition);
-        assert_eq!(&list_level.repetition, &expected_level.repetition);
-        assert_eq!(&list_level.array_offsets, &expected_level.array_offsets);
-        assert_eq!(&list_level.array_mask, &expected_level.array_mask);
-        assert_eq!(&list_level.max_definition, &expected_level.max_definition);
-        assert_eq!(&list_level.level_type, &expected_level.level_type);
         assert_eq!(list_level, &expected_level);
     }
 
@@ -1445,20 +1000,6 @@ mod tests {
         .unwrap();
 
         //////////////////////////////////////////////
-        let expected_batch_level = LevelInfo {
-            definition: vec![0; 5],
-            repetition: None,
-            array_offsets: (0..=5).collect(),
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 5,
-        };
-
-        let batch_level = LevelInfo::new(0, 5);
-        assert_eq!(&batch_level, &expected_batch_level);
-
         // calculate the list's level
         let mut levels = vec![];
         batch
@@ -1466,7 +1007,7 @@ mod tests {
             .iter()
             .zip(batch.schema().fields())
             .for_each(|(array, field)| {
-                let mut array_levels = batch_level.calculate_array_levels(array, field);
+                let mut array_levels = calculate_array_levels(array, field).unwrap();
                 levels.append(&mut array_levels);
             });
         assert_eq!(levels.len(), 5);
@@ -1475,14 +1016,11 @@ mod tests {
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![0, 0, 0, 0, 0],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, true, true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 5,
+            def_levels: None,
+            rep_levels: None,
+            non_null_indices: vec![0, 1, 2, 3, 4],
+            max_def_level: 0,
+            max_rep_level: 0,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1490,14 +1028,11 @@ mod tests {
         let list_level = levels.get(1).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![1, 0, 0, 1, 1],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, false, false, true, true],
-            max_definition: 1,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![1, 0, 0, 1, 1]),
+            rep_levels: None,
+            non_null_indices: vec![0, 3, 4],
+            max_def_level: 1,
+            max_rep_level: 0,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1505,14 +1040,11 @@ mod tests {
         let list_level = levels.get(2).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![1, 1, 1, 2, 1],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![false, false, false, true, false],
-            max_definition: 2,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![1, 1, 1, 2, 1]),
+            rep_levels: None,
+            non_null_indices: vec![3],
+            max_def_level: 2,
+            max_rep_level: 0,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1520,36 +1052,15 @@ mod tests {
         let list_level = levels.get(3).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![3, 2, 3, 2, 3],
-            repetition: None,
-            array_offsets: vec![0, 1, 2, 3, 4, 5],
-            array_mask: vec![true, false, true, false, true],
-            max_definition: 3,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![3, 2, 3, 2, 3]),
+            rep_levels: None,
+            non_null_indices: vec![0, 2, 4],
+            max_def_level: 3,
+            max_rep_level: 0,
         };
         assert_eq!(list_level, &expected_level);
     }
 
-    #[test]
-    fn test_filter_array_indices() {
-        let level = LevelInfo {
-            definition: vec![3, 3, 3, 1, 3, 3, 3],
-            repetition: Some(vec![0, 1, 1, 0, 0, 1, 1]),
-            array_offsets: vec![0, 3, 3, 6],
-            array_mask: vec![true, true, true, false, true, true, true],
-            max_definition: 3,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 6,
-        };
-
-        let expected = vec![0, 1, 2, 3, 4, 5];
-        let filter = level.filter_array_indices();
-        assert_eq!(expected, filter);
-    }
-
     #[test]
     fn test_null_vs_nonnull_struct() {
         // define schema
@@ -1571,9 +1082,8 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
                 .unwrap();
 
-        let batch_level = LevelInfo::new(0, batch.num_rows());
         let struct_null_level =
-            batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
+            calculate_array_levels(batch.column(0), batch.schema().field(0));
 
         // create second batch
         // define schema
@@ -1595,9 +1105,8 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
                 .unwrap();
 
-        let batch_level = LevelInfo::new(0, batch.num_rows());
         let struct_non_null_level =
-            batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
+            calculate_array_levels(batch.column(0), batch.schema().field(0));
 
         // The 2 levels should not be the same
         if struct_non_null_level == struct_null_level {
@@ -1634,20 +1143,6 @@ mod tests {
 
         let batch = reader.next().unwrap().unwrap();
 
-        let expected_batch_level = LevelInfo {
-            definition: vec![0; 3],
-            repetition: None,
-            array_offsets: (0..=3).collect(),
-            array_mask: vec![true, true, true],
-            max_definition: 0,
-            level_type: LevelType::Root,
-            offset: 0,
-            length: 3,
-        };
-
-        let batch_level = LevelInfo::new(0, 3);
-        assert_eq!(&batch_level, &expected_batch_level);
-
         // calculate the map's level
         let mut levels = vec![];
         batch
@@ -1655,7 +1150,7 @@ mod tests {
             .iter()
             .zip(batch.schema().fields())
             .for_each(|(array, field)| {
-                let mut array_levels = batch_level.calculate_array_levels(array, field);
+                let mut array_levels = calculate_array_levels(array, field).unwrap();
                 levels.append(&mut array_levels);
             });
         assert_eq!(levels.len(), 2);
@@ -1664,14 +1159,11 @@ mod tests {
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![1; 7],
-            repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
-            array_offsets: vec![0, 2, 4, 7],
-            array_mask: vec![true; 7],
-            max_definition: 1,
-            level_type: LevelType::Primitive(false),
-            offset: 0,
-            length: 7,
+            def_levels: Some(vec![1; 7]),
+            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
+            max_def_level: 1,
+            max_rep_level: 1,
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1679,14 +1171,11 @@ mod tests {
         let list_level = levels.get(1).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![2, 2, 2, 1, 2, 1, 2],
-            repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
-            array_offsets: vec![0, 2, 4, 7],
-            array_mask: vec![true, true, true, false, true, false, true],
-            max_definition: 2,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 7,
+            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
+            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+            non_null_indices: vec![0, 1, 2, 4, 6],
+            max_def_level: 2,
+            max_rep_level: 1,
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1760,63 +1249,155 @@ mod tests {
 
         let array = Arc::new(list_builder.finish());
 
+        let values_len = array.data().child_data()[0].len();
+        assert_eq!(values_len, 5);
+
         let schema = Arc::new(Schema::new(vec![list_field]));
 
         let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
 
-        let batch_level = LevelInfo::new(0, rb.num_rows());
-        let list_level =
-            &batch_level.calculate_array_levels(rb.column(0), rb.schema().field(0))[0];
+        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
+        let list_level = &levels[0];
 
         let expected_level = LevelInfo {
-            definition: vec![4, 1, 0, 2, 2, 3, 4],
-            repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
-            array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
-            array_mask: vec![true, true, false, false, false, false, true],
-            max_definition: 4,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
+            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
+            non_null_indices: vec![0, 4],
+            max_def_level: 4,
+            max_rep_level: 1,
         };
 
         assert_eq!(list_level, &expected_level);
     }
 
     #[test]
-    fn test_nested_indices() {
-        // Given a buffer like
-        // [0, null, null, 1, 2]
-        //
-        // The two level infos below might represent the two structures
-        // 1: [{a: 0}], [], null, [null, null], [{a: 1}], [{a: 2}]
-        // 2: [0], [], null, [null, null], [1], [2]
-        //
-        // (That is, their only difference is that the leaf values are nested one level deeper in a
-        // struct).
-
-        let level1 = LevelInfo {
-            definition: vec![4, 1, 0, 2, 2, 4, 4],
-            repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
-            array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
-            array_mask: vec![true, true, false, false, false, false, true],
-            max_definition: 4,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+    fn test_struct_mask_list() {
+        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
+        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+            Some(vec![Some(1), Some(2)]),
+            Some(vec![None]),
+            Some(vec![]),
+            Some(vec![Some(3), None]), // Masked by struct array
+            Some(vec![Some(4), Some(5)]),
+            None, // Masked by struct array
+            None,
+        ]);
+
+        // This test assumes that nulls don't take up space
+        assert_eq!(inner.data().child_data()[0].len(), 7);
+
+        let field = Field::new("list", inner.data_type().clone(), true);
+        let array = Arc::new(inner) as ArrayRef;
+        let nulls = Buffer::from([0b01010111]);
+        let struct_a = StructArray::from((vec![(field, array)], nulls));
+
+        let field = Field::new("struct", struct_a.data_type().clone(), true);
+        let array = Arc::new(struct_a) as ArrayRef;
+        let levels = calculate_array_levels(&array, &field).unwrap();
+
+        assert_eq!(levels.len(), 1);
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
+            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
+            non_null_indices: vec![0, 1, 5, 6],
+            max_def_level: 4,
+            max_rep_level: 1,
         };
 
-        let level2 = LevelInfo {
-            definition: vec![3, 1, 0, 2, 2, 3, 3],
-            repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
-            array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
-            array_mask: vec![true, true, false, false, false, false, true],
-            max_definition: 3,
-            level_type: LevelType::Primitive(true),
-            offset: 0,
-            length: 5,
+        assert_eq!(&levels[0], &expected_level);
+    }
+
+    #[test]
+    fn test_list_mask_struct() {
+        // Test the null mask of a struct array and the null mask of a list array
+        // masking out non-null elements of their children
+
+        let a1 = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+            Some(vec![None]), // Masked by list array
+            Some(vec![]),     // Masked by list array
+            Some(vec![Some(3), None]),
+            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
+            None,
+            None,
+        ])) as ArrayRef;
+
+        let a2 = Arc::new(Int32Array::from_iter(vec![
+            Some(1), // Masked by list array
+            Some(2), // Masked by list array
+            None,
+            Some(4), // Masked by struct array
+            Some(5),
+            None,
+        ])) as ArrayRef;
+
+        let field_a1 = Field::new("list", a1.data_type().clone(), true);
+        let field_a2 = Field::new("integers", a2.data_type().clone(), true);
+
+        let nulls = Buffer::from([0b00110111]);
+        let struct_a = Arc::new(
+            StructArray::try_from((vec![(field_a1, a1), (field_a2, a2)], nulls)).unwrap(),
+        ) as ArrayRef;
+
+        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
+        let nulls = Buffer::from([0b00111100]);
+
+        let list_type = DataType::List(Box::new(Field::new(
+            "struct",
+            struct_a.data_type().clone(),
+            true,
+        )));
+
+        let data = ArrayDataBuilder::new(list_type.clone())
+            .len(6)
+            .null_bit_buffer(Some(nulls))
+            .add_buffer(offsets)
+            .add_child_data(struct_a.data().clone())
+            .build()
+            .unwrap();
+
+        let list = make_array(data);
+        let list_field = Field::new("col", list_type, true);
+
+        let expected = vec![
+            r#"+-------------------------------------+"#,
+            r#"| col                                 |"#,
+            r#"+-------------------------------------+"#,
+            r#"|                                     |"#,
+            r#"|                                     |"#,
+            r#"| []                                  |"#,
+            r#"| [{"list": [3, ], "integers": null}] |"#,
+            r#"| [, {"list": null, "integers": 5}]   |"#,
+            r#"| []                                  |"#,
+            r#"+-------------------------------------+"#,
+        ]
+        .join("\n");
+
+        let pretty = pretty_format_columns(list_field.name(), &[list.clone()]).unwrap();
+        assert_eq!(pretty.to_string(), expected);
+
+        let levels = calculate_array_levels(&list, &list_field).unwrap();
+
+        assert_eq!(levels.len(), 2);
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
+            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
+            non_null_indices: vec![1],
+            max_def_level: 6,
+            max_rep_level: 2,
+        };
+
+        assert_eq!(&levels[0], &expected_level);
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
+            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
+            non_null_indices: vec![4],
+            max_def_level: 4,
+            max_rep_level: 1,
         };
 
-        // filter_array_indices should return the same indices in this case.
-        assert_eq!(level1.filter_array_indices(), level2.filter_array_indices());
+        assert_eq!(&levels[1], &expected_level);
     }
 }