You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/05/11 05:42:49 UTC

[arrow-rs] branch master updated: Fix null struct and list roundtrip (#270)

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8226219  Fix null struct and list roundtrip (#270)
8226219 is described below

commit 8226219fe7104f6c8a2740806f96f02c960d991c
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Tue May 11 07:42:41 2021 +0200

    Fix null struct and list roundtrip (#270)
    
    * fix null struct and list inconsistencies in writer
    
    * fix list reader null and empty slot calculation
    
    * remove stray TODOs
---
 parquet/src/arrow/array_reader.rs |  95 ++++-----
 parquet/src/arrow/arrow_writer.rs |  54 ++---
 parquet/src/arrow/levels.rs       | 430 +++++++++++++++++---------------------
 3 files changed, 265 insertions(+), 314 deletions(-)

diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f209b8b..f54e446 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -615,6 +615,8 @@ pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
     item_type: ArrowType,
     list_def_level: i16,
     list_rep_level: i16,
+    list_empty_def_level: i16,
+    list_null_def_level: i16,
     def_level_buffer: Option<Buffer>,
     rep_level_buffer: Option<Buffer>,
     _marker: PhantomData<OffsetSize>,
@@ -628,6 +630,8 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
         item_type: ArrowType,
         def_level: i16,
         rep_level: i16,
+        list_null_def_level: i16,
+        list_empty_def_level: i16,
     ) -> Self {
         Self {
             item_reader,
@@ -635,6 +639,8 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
             item_type,
             list_def_level: def_level,
             list_rep_level: rep_level,
+            list_null_def_level,
+            list_empty_def_level,
             def_level_buffer: None,
             rep_level_buffer: None,
             _marker: PhantomData,
@@ -843,61 +849,49 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
         // Where n is the max definition level of the list's parent.
         // If a Parquet schema's only leaf is the list, then n = 0.
 
-        // TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3
-        let list_field_type = match self.get_data_type() {
-            ArrowType::List(field)
-            | ArrowType::FixedSizeList(field, _)
-            | ArrowType::LargeList(field) => field,
-            _ => {
-                // Panic: this is safe as we only write lists from list datatypes
-                unreachable!()
-            }
-        };
-        let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 };
-        let max_list_definition = *(def_levels.iter().max().unwrap());
-        // TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists
-        // debug_assert!(
-        //     max_list_definition >= max_list_def_range,
-        //     "Lift definition max less than range"
-        // );
-        let list_null_def = max_list_definition - max_list_def_range;
-        let list_empty_def = max_list_definition - 1;
-        let mut null_list_indices: Vec<usize> = Vec::new();
-        for i in 0..def_levels.len() {
-            if def_levels[i] == list_null_def {
-                null_list_indices.push(i);
-            }
-        }
+        // If the list index is at empty definition, the child slot is null
+        let null_list_indices: Vec<usize> = def_levels
+            .iter()
+            .enumerate()
+            .filter_map(|(index, def)| {
+                if *def <= self.list_empty_def_level {
+                    Some(index)
+                } else {
+                    None
+                }
+            })
+            .collect();
         let batch_values = match null_list_indices.len() {
             0 => next_batch_array.clone(),
             _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?,
         };
 
-        // null list has def_level = 0
-        // empty list has def_level = 1
-        // null item in a list has def_level = 2
-        // non-null item has def_level = 3
         // first item in each list has rep_level = 0, subsequent items have rep_level = 1
-
         let mut offsets: Vec<OffsetSize> = Vec::new();
         let mut cur_offset = OffsetSize::zero();
-        for i in 0..rep_levels.len() {
-            if rep_levels[i] == 0 {
-                offsets.push(cur_offset)
+        def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
+            if *r == 0 || d == &self.list_empty_def_level {
+                offsets.push(cur_offset);
             }
-            if def_levels[i] >= list_empty_def {
+            if d > &self.list_empty_def_level {
                 cur_offset += OffsetSize::one();
             }
-        }
+        });
         offsets.push(cur_offset);
 
         let num_bytes = bit_util::ceil(offsets.len(), 8);
-        let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+        // TODO: A useful optimization is to use the null count to fill with
+        // 0 or null, to reduce individual bits set in a loop.
+        // To favour dense data, set every slot to true, then unset
+        let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
         let null_slice = null_buf.as_slice_mut();
         let mut list_index = 0;
         for i in 0..rep_levels.len() {
-            if rep_levels[i] == 0 && def_levels[i] != 0 {
-                bit_util::set_bit(null_slice, list_index);
+            // If the level is lower than empty, then the slot is null.
+            // When a list is non-nullable, its empty level = null level,
+            // so this automatically factors that in.
+            if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
+                bit_util::unset_bit(null_slice, list_index);
             }
             if rep_levels[i] == 0 {
                 list_index += 1;
@@ -1282,16 +1276,15 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
         let mut new_context = context.clone();
 
         new_context.path.append(vec![list_type.name().to_string()]);
+        // We need to know at what definition a list or its child is null
+        let list_null_def = new_context.def_level;
+        let mut list_empty_def = new_context.def_level;
 
-        match list_type.get_basic_info().repetition() {
-            Repetition::REPEATED => {
-                new_context.def_level += 1;
-                new_context.rep_level += 1;
-            }
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            _ => (),
+        // If the list's root is nullable
+        if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
+            new_context.def_level += 1;
+            // current level is nullable, increment to get level for empty list slot
+            list_empty_def += 1;
         }
 
         match list_child.get_basic_info().repetition() {
@@ -1350,6 +1343,8 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
                         item_reader_type,
                         new_context.def_level,
                         new_context.rep_level,
+                        list_null_def,
+                        list_empty_def,
                     )),
                     ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
                         item_reader,
@@ -1357,6 +1352,8 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
                         item_reader_type,
                         new_context.def_level,
                         new_context.rep_level,
+                        list_null_def,
+                        list_empty_def,
                     )),
 
                     _ => {
@@ -2468,6 +2465,8 @@ mod tests {
             ArrowType::Int32,
             1,
             1,
+            0,
+            1,
         );
 
         let next_batch = list_array_reader.next_batch(1024).unwrap();
@@ -2522,6 +2521,8 @@ mod tests {
             ArrowType::Int32,
             1,
             1,
+            0,
+            1,
         );
 
         let next_batch = list_array_reader.next_batch(1024).unwrap();
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index df6ce98..be278ed 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -91,7 +91,7 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
         let batch_level = LevelInfo::new_from_batch(batch);
         let mut row_group_writer = self.writer.next_row_group()?;
         for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
-            let mut levels = batch_level.calculate_array_levels(array, field, false);
+            let mut levels = batch_level.calculate_array_levels(array, field);
             // Reverse levels as we pop() them when writing arrays
             levels.reverse();
             write_leaves(&mut row_group_writer, array, &mut levels)?;
@@ -793,25 +793,29 @@ mod tests {
         let struct_field_g = Field::new(
             "g",
             DataType::List(Box::new(Field::new("item", DataType::Int16, true))),
+            false,
+        );
+        let struct_field_h = Field::new(
+            "h",
+            DataType::List(Box::new(Field::new("item", DataType::Int16, false))),
             true,
         );
         let struct_field_e = Field::new(
             "e",
-            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
-            true,
+            DataType::Struct(vec![
+                struct_field_f.clone(),
+                struct_field_g.clone(),
+                struct_field_h.clone(),
+            ]),
+            false,
         );
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, true),
-            // Note: when the below struct is set to non-nullable, this test fails,
-            // but the output data written is correct.
-            // Interestingly, pyarrow will read it correctly, but pyspark fails to.
-            // This might be a compatibility quirk between arrow and parquet.
-            // We have opened https://github.com/apache/arrow-rs/issues/245 to investigate
             Field::new(
                 "c",
                 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
-                true,
+                false,
             ),
         ]);
 
@@ -831,15 +835,23 @@ mod tests {
         // Construct a list array from the above two
         let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
             .len(5)
-            .add_buffer(g_value_offsets)
+            .add_buffer(g_value_offsets.clone())
             .add_child_data(g_value.data().clone())
-            // .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues
             .build();
         let g = ListArray::from(g_list_data);
+        // The difference between g and h is that h has a null bitmap
+        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
+            .len(5)
+            .add_buffer(g_value_offsets)
+            .add_child_data(g_value.data().clone())
+            .null_bit_buffer(Buffer::from(vec![0b00011011]))
+            .build();
+        let h = ListArray::from(h_list_data);
 
         let e = StructArray::from(vec![
             (struct_field_f, Arc::new(f) as ArrayRef),
             (struct_field_g, Arc::new(g) as ArrayRef),
+            (struct_field_h, Arc::new(h) as ArrayRef),
         ]);
 
         let c = StructArray::from(vec![
@@ -860,14 +872,10 @@ mod tests {
     #[test]
     fn arrow_writer_complex_mixed() {
         // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
-        // Only writing the "offest_field" column works when "some_nested_object" is non-null.
-        // This indicates that a non-null struct should not have a null child (with null values).
-        // One observation is that spark doesn't consider the parent struct's nullness,
-        // and so, we should investigate the impact of always treating structs as null.
-        // See https://github.com/apache/arrow-rs/issues/245.
+        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
 
         // define schema
-        let offset_field = Field::new("offset", DataType::Int32, true);
+        let offset_field = Field::new("offset", DataType::Int32, false);
         let partition_field = Field::new("partition", DataType::Int64, true);
         let topic_field = Field::new("topic", DataType::Utf8, true);
         let schema = Schema::new(vec![Field::new(
@@ -877,7 +885,7 @@ mod tests {
                 partition_field.clone(),
                 topic_field.clone(),
             ]),
-            true,
+            false,
         )]);
 
         // create some data
@@ -970,14 +978,10 @@ mod tests {
         let schema = Schema::new(vec![field_a.clone()]);
 
         // create data
-        // When the null buffer of the struct is created, this test fails.
-        // It appears that the nullness of the struct is ignored when the
-        // struct is read back.
-        // See https://github.com/apache/arrow-rs/issues/245
         let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
         let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
             .len(6)
-            // .null_bit_buffer(Buffer::from(vec![0b00100111]))
+            .null_bit_buffer(Buffer::from(vec![0b00100111]))
             .add_child_data(c.data().clone())
             .build();
         let b = StructArray::from(b_data);
@@ -989,7 +993,7 @@ mod tests {
         let a = StructArray::from(a_data);
 
         assert_eq!(a.null_count(), 0);
-        assert_eq!(a.column(0).null_count(), 0);
+        assert_eq!(a.column(0).null_count(), 2);
 
         // build a racord batch
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
@@ -1362,7 +1366,7 @@ mod tests {
         let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
             "item",
             DataType::Int32,
-            true, // TODO: why does this fail when false? Is it related to logical nulls?
+            false,
         ))))
         .len(5)
         .add_buffer(a_value_offsets)
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 2669581..be56726 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -65,10 +65,29 @@ pub(crate) struct LevelInfo {
     pub array_mask: Vec<bool>,
     /// The maximum definition at this level, 0 at the record batch
     pub max_definition: i16,
-    /// Whether this array or any of its parents is a list
-    pub is_list: bool,
-    /// Whether the current array is nullable (affects definition levels)
-    pub is_nullable: bool,
+    /// The type of array represented by this level info
+    pub level_type: LevelType,
+}
+
+/// LevelType defines the type of level, and whether it is nullable or not
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(crate) enum LevelType {
+    Root,
+    List(bool),
+    Struct(bool),
+    Primitive(bool),
+}
+
+impl LevelType {
+    #[inline]
+    const fn level_increment(&self) -> i16 {
+        match self {
+            LevelType::Root => 0,
+            LevelType::List(is_nullable)
+            | LevelType::Struct(is_nullable)
+            | LevelType::Primitive(is_nullable) => *is_nullable as i16,
+        }
+    }
 }
 
 impl LevelInfo {
@@ -87,10 +106,7 @@ impl LevelInfo {
             // all values at a batch-level are non-null
             array_mask: vec![true; num_rows],
             max_definition: 0,
-            is_list: false,
-            // a batch is treated as nullable even though it has no nulls,
-            // this is required to compute nested type levels correctly
-            is_nullable: false,
+            level_type: LevelType::Root,
         }
     }
 
@@ -110,7 +126,6 @@ impl LevelInfo {
         &self,
         array: &ArrayRef,
         field: &Field,
-        is_parent_struct: bool,
     ) -> Vec<Self> {
         let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array);
         match array.data_type() {
@@ -120,8 +135,8 @@ impl LevelInfo {
                 array_offsets: self.array_offsets.clone(),
                 array_mask,
                 max_definition: self.max_definition.max(1),
-                is_list: self.is_list,
-                is_nullable: true, // always nullable as all values are nulls
+                // Null type is always nullable
+                level_type: LevelType::Primitive(true),
             }],
             DataType::Boolean
             | DataType::Int8
@@ -152,9 +167,7 @@ impl LevelInfo {
                 vec![self.calculate_child_levels(
                     array_offsets,
                     array_mask,
-                    false,
-                    is_parent_struct,
-                    field.is_nullable(),
+                    LevelType::Primitive(field.is_nullable()),
                 )]
             }
             DataType::List(list_field) | DataType::LargeList(list_field) => {
@@ -162,10 +175,7 @@ impl LevelInfo {
                 let list_level = self.calculate_child_levels(
                     array_offsets,
                     array_mask,
-                    true,
-                    // the list could come from a struct, but its children will all be false
-                    is_parent_struct,
-                    field.is_nullable(),
+                    LevelType::List(field.is_nullable()),
                 );
 
                 // Construct the child array of the list, and get its offset + mask
@@ -207,13 +217,11 @@ impl LevelInfo {
                         vec![list_level.calculate_child_levels(
                             child_offsets,
                             child_mask,
-                            false,
-                            false, // always false
-                            list_field.is_nullable(),
+                            LevelType::Primitive(list_field.is_nullable()),
                         )]
                     }
                     DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
-                        list_level.calculate_array_levels(&child_array, list_field, false)
+                        list_level.calculate_array_levels(&child_array, list_field)
                     }
                     DataType::FixedSizeList(_, _) => unimplemented!(),
                     DataType::Union(_) => unimplemented!(),
@@ -228,10 +236,7 @@ impl LevelInfo {
                 let struct_level = self.calculate_child_levels(
                     array_offsets,
                     array_mask,
-                    false,
-                    // struct's own parent could be a struct
-                    is_parent_struct,
-                    field.is_nullable(),
+                    LevelType::Struct(field.is_nullable()),
                 );
                 let mut struct_levels = vec![];
                 struct_array
@@ -239,12 +244,8 @@ impl LevelInfo {
                     .into_iter()
                     .zip(struct_fields)
                     .for_each(|(child_array, child_field)| {
-                        let mut levels = struct_level.calculate_array_levels(
-                            child_array,
-                            child_field,
-                            // this is the only place where this is always true
-                            true,
-                        );
+                        let mut levels =
+                            struct_level.calculate_array_levels(child_array, child_field);
                         struct_levels.append(&mut levels);
                     });
                 struct_levels
@@ -258,9 +259,7 @@ impl LevelInfo {
                 vec![self.calculate_child_levels(
                     array_offsets,
                     array_mask,
-                    false,
-                    is_parent_struct,
-                    field.is_nullable(),
+                    LevelType::Primitive(field.is_nullable()),
                 )]
             }
         }
@@ -315,80 +314,39 @@ impl LevelInfo {
         // we use 64-bit offsets to also accommodate large arrays
         array_offsets: Vec<i64>,
         array_mask: Vec<bool>,
-        is_list: bool,
-        is_parent_struct: bool,
-        is_nullable: bool,
+        level_type: LevelType,
     ) -> Self {
         let min_len = *(array_offsets.last().unwrap()) as usize;
         let mut definition = Vec::with_capacity(min_len);
         let mut repetition = Vec::with_capacity(min_len);
         let mut merged_array_mask = Vec::with_capacity(min_len);
 
-        // determine the total level increment based on data types
-        let max_definition = match is_list {
-            false => {
-                // first exception, start of a batch, and not list
-                if self.max_definition == 0 {
-                    1
-                } else if self.is_list {
-                    // second exception, always increment after a list
-                    self.max_definition + 1
-                } else if is_parent_struct && !self.is_nullable {
-                    // if the parent is a non-null struct, don't increment
-                    self.max_definition
-                } else {
-                    self.max_definition + is_nullable as i16
-                }
+        let max_definition = match (self.level_type, level_type) {
+            (LevelType::Root, LevelType::Struct(is_nullable)) => {
+                // If the struct is non-nullable, its def level doesn't increment
+                is_nullable as i16
             }
-            true => {
-                if is_parent_struct && !self.is_nullable {
-                    self.max_definition + is_nullable as i16
-                } else {
-                    self.max_definition + 1 + is_nullable as i16
-                }
+            (LevelType::Root, _) => 1,
+            (_, LevelType::Root) => {
+                unreachable!("Cannot have a root as a child")
+            }
+            (LevelType::List(_), _) => {
+                self.max_definition + 1 + level_type.level_increment()
+            }
+            (LevelType::Struct(_), _) => {
+                self.max_definition + level_type.level_increment()
+            }
+            (_, LevelType::List(is_nullable)) => {
+                // if the child is a list, even if its parent is a root
+                self.max_definition + 1 + is_nullable as i16
+            }
+            (LevelType::Primitive(_), _) => {
+                unreachable!("Cannot have a primitive parent for any type")
             }
         };
 
-        match (self.is_list, is_list) {
-            (false, false) => {
-                self.definition
-                    .iter()
-                    .zip(array_mask.into_iter().zip(&self.array_mask))
-                    .for_each(|(def, (child_mask, parent_mask))| {
-                        merged_array_mask.push(*parent_mask && child_mask);
-                        match (parent_mask, child_mask) {
-                            (true, true) => {
-                                definition.push(max_definition);
-                            }
-                            (true, false) => {
-                                // The child is only legally null if its array is nullable.
-                                // Thus parent's max_definition is lower
-                                definition.push(if *def <= self.max_definition {
-                                    *def
-                                } else {
-                                    self.max_definition
-                                });
-                            }
-                            // if the parent was false, retain its definitions
-                            (false, _) => {
-                                definition.push(*def);
-                            }
-                        }
-                    });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                Self {
-                    definition,
-                    repetition: self.repetition.clone(), // it's None
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    is_list: false,
-                    is_nullable,
-                }
-            }
-            (true, true) => {
+        match (self.level_type, level_type) {
+            (LevelType::List(_), LevelType::List(is_nullable)) => {
                 // parent is a list or descendant of a list, and child is a list
                 let reps = self.repetition.clone().unwrap();
                 // Calculate the 2 list hierarchy definitions in advance
@@ -466,11 +424,10 @@ impl LevelInfo {
                     array_offsets,
                     array_mask: merged_array_mask,
                     max_definition,
-                    is_list: true,
-                    is_nullable,
+                    level_type,
                 }
             }
-            (true, false) => {
+            (LevelType::List(_), _) => {
                 // List and primitive (or struct).
                 // The list can have more values than the primitive, indicating that there
                 // are slots where the list is empty. We use a counter to track this behaviour.
@@ -519,11 +476,10 @@ impl LevelInfo {
                     array_offsets: self.array_offsets.clone(),
                     array_mask: merged_array_mask,
                     max_definition,
-                    is_list: true,
-                    is_nullable,
+                    level_type,
                 }
             }
-            (false, true) => {
+            (_, LevelType::List(is_nullable)) => {
                 // Encountering a list for the first time.
                 // Calculate the 2 list hierarchy definitions in advance
 
@@ -545,11 +501,7 @@ impl LevelInfo {
                         match (parent_mask, child_len) {
                             (true, 0) => {
                                 // empty slot that is valid, i.e. {"parent": {"child": [] } }
-                                definition.push(if child_mask {
-                                    l2
-                                } else {
-                                    self.max_definition
-                                });
+                                definition.push(if child_mask { l3 } else { l2 });
                                 repetition.push(0);
                                 merged_array_mask.push(child_mask);
                             }
@@ -593,8 +545,44 @@ impl LevelInfo {
                     array_offsets,
                     array_mask: merged_array_mask,
                     max_definition,
-                    is_list: true,
-                    is_nullable,
+                    level_type,
+                }
+            }
+            (_, _) => {
+                self.definition
+                    .iter()
+                    .zip(array_mask.into_iter().zip(&self.array_mask))
+                    .for_each(|(current_def, (child_mask, parent_mask))| {
+                        merged_array_mask.push(*parent_mask && child_mask);
+                        match (parent_mask, child_mask) {
+                            (true, true) => {
+                                definition.push(max_definition);
+                            }
+                            (true, false) => {
+                                // The child is only legally null if its array is nullable.
+                                // Thus parent's max_definition is lower
+                                definition.push(if *current_def <= self.max_definition {
+                                    *current_def
+                                } else {
+                                    self.max_definition
+                                });
+                            }
+                            // if the parent was false, retain its definitions
+                            (false, _) => {
+                                definition.push(*current_def);
+                            }
+                        }
+                    });
+
+                debug_assert_eq!(definition.len(), merged_array_mask.len());
+
+                Self {
+                    definition,
+                    repetition: self.repetition.clone(), // it's None
+                    array_offsets,
+                    array_mask: merged_array_mask,
+                    max_definition,
+                    level_type,
                 }
             }
         }
@@ -647,14 +635,20 @@ impl LevelInfo {
                     .into_iter()
                     .map(|v| v as i64)
                     .collect::<Vec<i64>>();
-                let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect();
-                (offsets, masks)
+                let array_mask = match array.data().null_buffer() {
+                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
+                    None => vec![true; array.len()],
+                };
+                (offsets, array_mask)
             }
             DataType::LargeList(_) => {
                 let offsets =
                     unsafe { array.data().buffers()[0].typed_data::<i64>() }.to_vec();
-                let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect();
-                (offsets, masks)
+                let array_mask = match array.data().null_buffer() {
+                    Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
+                    None => vec![true; array.len()],
+                };
+                (offsets, array_mask)
             }
             DataType::FixedSizeBinary(value_len) => {
                 let array_mask = match array.data().null_buffer() {
@@ -676,7 +670,14 @@ impl LevelInfo {
     /// Given a level's information, calculate the offsets required to index an array correctly.
     pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
         // happy path if not dealing with lists
-        if !self.is_list {
+        let is_nullable = match self.level_type {
+            LevelType::Primitive(is_nullable) => is_nullable,
+            _ => panic!(
+                "Cannot filter indices on a non-primitive array, found {:?}",
+                self.level_type
+            ),
+        };
+        if self.repetition.is_none() {
             return self
                 .definition
                 .iter()
@@ -697,7 +698,7 @@ impl LevelInfo {
             if *def == self.max_definition {
                 filtered.push(index);
             }
-            if *def >= self.max_definition - self.is_nullable as i16 {
+            if *def >= self.max_definition - is_nullable as i16 {
                 index += 1;
             }
         });
@@ -746,8 +747,7 @@ mod tests {
             array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
             array_mask: vec![true, true], // both lists defined
             max_definition: 0,
-            is_list: false,     // root is never list
-            is_nullable: false, // root in example is non-nullable
+            level_type: LevelType::Root,
         };
         // offset into array, each level1 has 2 values
         let array_offsets = vec![0, 2, 4];
@@ -757,9 +757,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            false,
+            LevelType::List(false),
         );
         //
         let expected_levels = LevelInfo {
@@ -768,8 +766,7 @@ mod tests {
             array_offsets,
             array_mask: vec![true, true, true, true],
             max_definition: 1,
-            is_list: true,
-            is_nullable: false,
+            level_type: LevelType::List(false),
         };
         // the separate asserts make it easier to see what's failing
         assert_eq!(&levels.definition, &expected_levels.definition);
@@ -777,8 +774,7 @@ mod tests {
         assert_eq!(&levels.array_mask, &expected_levels.array_mask);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         // this assert is to help if there are more variables added to the struct
         assert_eq!(&levels, &expected_levels);
 
@@ -789,9 +785,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            false,
+            LevelType::List(false),
         );
         let expected_levels = LevelInfo {
             definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
@@ -799,16 +793,14 @@ mod tests {
             array_offsets,
             array_mask: vec![true; 10],
             max_definition: 2,
-            is_list: true,
-            is_nullable: false,
+            level_type: LevelType::List(false),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_mask, &expected_levels.array_mask);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
     }
 
@@ -821,8 +813,7 @@ mod tests {
             array_offsets: (0..=10).collect(),
             array_mask: vec![true; 10],
             max_definition: 0,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Root,
         };
         let array_offsets: Vec<i64> = (0..=10).collect();
         let array_mask = vec![true; 10];
@@ -830,9 +821,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask.clone(),
-            false,
-            false,
-            false,
+            LevelType::Primitive(false),
         );
         let expected_levels = LevelInfo {
             definition: vec![1; 10],
@@ -840,8 +829,7 @@ mod tests {
             array_offsets,
             array_mask,
             max_definition: 1,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Primitive(false),
         };
         assert_eq!(&levels, &expected_levels);
     }
@@ -855,8 +843,7 @@ mod tests {
             array_offsets: (0..=5).collect(),
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Root,
         };
         let array_offsets: Vec<i64> = (0..=5).collect();
         let array_mask = vec![true, false, true, true, false];
@@ -864,9 +851,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask.clone(),
-            false,
-            false,
-            true,
+            LevelType::Primitive(true),
         );
         let expected_levels = LevelInfo {
             definition: vec![1, 0, 1, 1, 0],
@@ -874,8 +859,7 @@ mod tests {
             array_offsets,
             array_mask,
             max_definition: 1,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
         assert_eq!(&levels, &expected_levels);
     }
@@ -890,8 +874,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Root,
         };
         let array_offsets = vec![0, 2, 2, 4, 8, 11];
         let array_mask = vec![true, false, true, true, true];
@@ -899,9 +882,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            true,
+            LevelType::List(true),
         );
         // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
         // all values are defined as we do not have nulls on the root (batch)
@@ -912,22 +893,24 @@ mod tests {
         //   3: 0, 1, 1, 1
         //   4: 0, 1, 1
         let expected_levels = LevelInfo {
-            definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
+            // The levels are normally 2 because we:
+            // - Calculate the level at the list
+            // - Calculate the level at the list's child
+            // We do not do this in these tests, thus the levels are 1 less.
+            definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
             repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
             array_offsets,
             array_mask: vec![
                 true, true, false, true, true, true, true, true, true, true, true, true,
             ],
-            max_definition: 2,
-            is_list: true,
-            is_nullable: true,
+            max_definition: 1,
+            level_type: LevelType::List(true),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
     }
 
@@ -952,8 +935,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![false, true, false, true, true],
             max_definition: 1,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Struct(true),
         };
         let array_offsets = vec![0, 2, 2, 4, 8, 11];
         let array_mask = vec![true, false, true, true, true];
@@ -961,9 +943,7 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            true,
+            LevelType::List(true),
         );
         let expected_levels = LevelInfo {
             // 0 1 [2] are 0 (not defined at level 1)
@@ -971,23 +951,21 @@ mod tests {
             // 2 3 [4] are 0
             // 4 5 6 7 [8] are 1 (defined at level 1 only)
             // 8 9 10 [11] are 2 (defined at both levels)
-            definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
+            definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
             repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
             array_offsets,
             array_mask: vec![
                 false, false, false, false, false, true, true, true, true, true, true,
                 true,
             ],
-            max_definition: 3,
-            is_nullable: true,
-            is_list: true,
+            max_definition: 2,
+            level_type: LevelType::List(true),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
 
         // nested lists (using previous test)
@@ -999,9 +977,7 @@ mod tests {
         let levels = nested_parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            true,
+            LevelType::List(true),
         );
         let expected_levels = LevelInfo {
             // (def: 0) 0 1 [2] are 0 (take parent)
@@ -1028,7 +1004,7 @@ mod tests {
             // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
             // 4: [[116, 117], [118, 119], [120, 121]]
             definition: vec![
-                0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
+                0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
             ],
             repetition: Some(vec![
                 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
@@ -1039,17 +1015,15 @@ mod tests {
                 true, true, true, true, true, true, true, true, true, true, true, true,
                 true,
             ],
-            max_definition: 5,
-            is_nullable: true,
-            is_list: true,
+            max_definition: 4,
+            level_type: LevelType::List(true),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.array_mask, &expected_levels.array_mask);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
     }
 
@@ -1067,8 +1041,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4],
             array_mask: vec![true, true, true, true],
             max_definition: 1,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Struct(true),
         };
         // 0: null ([], but mask is false, so it's not just an empty list)
         // 1: [1, 2, 3]
@@ -1080,29 +1053,25 @@ mod tests {
         let levels = parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            true,
+            LevelType::List(true),
         );
         // 0: [null], level 1 is defined, but not 2
         // 1: [1, 2, 3]
         // 2: [4, 5]
         // 3: [6, 7]
         let expected_levels = LevelInfo {
-            definition: vec![2, 3, 3, 3, 3, 3, 3, 3],
+            definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
             repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
             array_offsets,
             array_mask: vec![false, true, true, true, true, true, true, true],
-            max_definition: 3,
-            is_list: true,
-            is_nullable: true,
+            max_definition: 2,
+            level_type: LevelType::List(true),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
 
         // nested lists (using previous test)
@@ -1121,9 +1090,7 @@ mod tests {
         let levels = nested_parent_levels.calculate_child_levels(
             array_offsets.clone(),
             array_mask,
-            true,
-            false,
-            true,
+            LevelType::List(true),
         );
         // We have 7 array values, and at least 15 primitives (from array_offsets)
         // 0: (-)[null], parent was null, no value populated here
@@ -1137,24 +1104,22 @@ mod tests {
         // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
         // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
         let expected_levels = LevelInfo {
-            definition: vec![2, 5, 5, 5, 3, 5, 5, 5, 5, 5, 5, 5, 3, 5, 5, 5, 5, 5],
+            definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
             repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
             array_mask: vec![
                 false, true, true, true, false, true, true, true, true, true, true, true,
                 true, true, true, true, true, true,
             ],
             array_offsets,
-            is_list: true,
-            is_nullable: true,
-            max_definition: 5,
+            max_definition: 4,
+            level_type: LevelType::List(true),
         };
         assert_eq!(&levels.definition, &expected_levels.definition);
         assert_eq!(&levels.repetition, &expected_levels.repetition);
         assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
         assert_eq!(&levels.array_mask, &expected_levels.array_mask);
         assert_eq!(&levels.max_definition, &expected_levels.max_definition);
-        assert_eq!(&levels.is_list, &expected_levels.is_list);
-        assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+        assert_eq!(&levels.level_type, &expected_levels.level_type);
         assert_eq!(&levels, &expected_levels);
     }
 
@@ -1174,8 +1139,7 @@ mod tests {
             array_offsets: (0..=6).collect(),
             array_mask: vec![true, true, true, true, false, true],
             max_definition: 1,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Struct(true),
         };
         // b's offset and mask
         let b_offsets: Vec<i64> = (0..=6).collect();
@@ -1187,11 +1151,13 @@ mod tests {
             array_offsets: (0..=6).collect(),
             array_mask: vec![true, true, true, false, false, true],
             max_definition: 2,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Struct(true),
         };
-        let b_levels =
-            a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, true);
+        let b_levels = a_levels.calculate_child_levels(
+            b_offsets.clone(),
+            b_mask,
+            LevelType::Struct(true),
+        );
         assert_eq!(&b_expected_levels, &b_levels);
 
         // c's offset and mask
@@ -1204,11 +1170,10 @@ mod tests {
             array_offsets: c_offsets.clone(),
             array_mask: vec![true, false, true, false, false, true],
             max_definition: 3,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Struct(true),
         };
         let c_levels =
-            b_levels.calculate_child_levels(c_offsets, c_mask, false, true, true);
+            b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
         assert_eq!(&c_expected_levels, &c_levels);
     }
 
@@ -1243,8 +1208,7 @@ mod tests {
             array_offsets: (0..=5).collect(),
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Root,
         };
 
         let batch_level = LevelInfo::new_from_batch(&batch);
@@ -1257,8 +1221,7 @@ mod tests {
             .iter()
             .zip(batch.schema().fields())
             .for_each(|(array, field)| {
-                let mut array_levels =
-                    batch_level.calculate_array_levels(array, field, false);
+                let mut array_levels = batch_level.calculate_array_levels(array, field);
                 levels.append(&mut array_levels);
             });
         assert_eq!(levels.len(), 1);
@@ -1273,16 +1236,14 @@ mod tests {
                 true, true, true, false, true, true, true, true, true, true, true,
             ],
             max_definition: 3,
-            is_list: true,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
         assert_eq!(&list_level.definition, &expected_level.definition);
         assert_eq!(&list_level.repetition, &expected_level.repetition);
         assert_eq!(&list_level.array_offsets, &expected_level.array_offsets);
         assert_eq!(&list_level.array_mask, &expected_level.array_mask);
         assert_eq!(&list_level.max_definition, &expected_level.max_definition);
-        assert_eq!(&list_level.is_list, &expected_level.is_list);
-        assert_eq!(&list_level.is_nullable, &expected_level.is_nullable);
+        assert_eq!(&list_level.level_type, &expected_level.level_type);
         assert_eq!(list_level, &expected_level);
     }
 
@@ -1290,8 +1251,6 @@ mod tests {
     fn mixed_struct_list() {
         // this tests the level generation from the equivalent arrow_writer_complex test
 
-        // TODO: Investigate failure if struct is null. See https://github.com/apache/arrow-rs/issues/245
-
         // define schema
         let struct_field_d = Field::new("d", DataType::Float64, true);
         let struct_field_f = Field::new("f", DataType::Float32, true);
@@ -1360,8 +1319,7 @@ mod tests {
             array_offsets: (0..=5).collect(),
             array_mask: vec![true, true, true, true, true],
             max_definition: 0,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Root,
         };
 
         let batch_level = LevelInfo::new_from_batch(&batch);
@@ -1374,8 +1332,7 @@ mod tests {
             .iter()
             .zip(batch.schema().fields())
             .for_each(|(array, field)| {
-                let mut array_levels =
-                    batch_level.calculate_array_levels(array, field, false);
+                let mut array_levels = batch_level.calculate_array_levels(array, field);
                 levels.append(&mut array_levels);
             });
         assert_eq!(levels.len(), 5);
@@ -1389,8 +1346,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![true, true, true, true, true],
             max_definition: 1,
-            is_list: false,
-            is_nullable: false,
+            level_type: LevelType::Primitive(false),
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1403,8 +1359,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![true, false, false, true, true],
             max_definition: 1,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1417,8 +1372,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![false, false, false, true, false],
             max_definition: 2,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
         assert_eq!(list_level, &expected_level);
 
@@ -1431,8 +1385,7 @@ mod tests {
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![true, false, true, false, true],
             max_definition: 3,
-            is_list: false,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1445,8 +1398,7 @@ mod tests {
             array_offsets: vec![0, 3, 3, 6],
             array_mask: vec![true, true, true, false, true, true, true],
             max_definition: 3,
-            is_list: true,
-            is_nullable: true,
+            level_type: LevelType::Primitive(true),
         };
 
         let expected = vec![0, 1, 2, 3, 4, 5];
@@ -1476,11 +1428,8 @@ mod tests {
                 .unwrap();
 
         let batch_level = LevelInfo::new_from_batch(&batch);
-        let struct_null_level = batch_level.calculate_array_levels(
-            batch.column(0),
-            batch.schema().field(0),
-            false,
-        );
+        let struct_null_level =
+            batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
 
         // create second batch
         // define schema
@@ -1503,11 +1452,8 @@ mod tests {
                 .unwrap();
 
         let batch_level = LevelInfo::new_from_batch(&batch);
-        let struct_non_null_level = batch_level.calculate_array_levels(
-            batch.column(0),
-            batch.schema().field(0),
-            false,
-        );
+        let struct_non_null_level =
+            batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
 
         // The 2 levels should not be the same
         if struct_non_null_level == struct_null_level {