You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/28 14:07:07 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8792: ARROW-9728: [Rust] [Parquet] Nested definition & repetition for structs

alamb commented on a change in pull request #8792:
URL: https://github.com/apache/arrow/pull/8792#discussion_r532037000



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -423,25 +313,64 @@ fn write_leaf(
     Ok(written as i64)
 }
 
-/// A struct that represents definition and repetition levels.
-/// Repetition levels are only populated if the parent or current leaf is repeated
-#[derive(Debug)]
-struct Levels {
-    definition: Vec<i16>,
-    repetition: Option<Vec<i16>>,
-}
-
 /// Compute nested levels of the Arrow array, recursing into lists and structs
-fn get_levels(
+/// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
+///
+/// The algorithm works by eagerly incrementing non-null values, and decrementing
+/// when a value is null.
+///
+/// *Examples:*
+///
+/// A record batch always starts at a populated definition = level 1.
+/// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
+/// can only have a maximum level of 1 if it is not null.
+/// If it is null, we decrement by 1, such that the null slots will = level 0.
+///
+/// If a batch has nested arrays (list, struct, union, etc.), then the incrementing
+/// takes place.
+/// A `<batch<struct[a]<primitive[b]>>` will have up to 2 levels (if nullable).
+/// When calculating levels for `a`, if the struct slot is not empty, we
+/// increment by 1, such that we'd have `[2, 2, 2]` if all 3 slots are not null.
+/// If there is an empty slot, we decrement, leaving us with `[2, 0, 2]` as the
+/// null slot effectively means that no record is populated for the row altogether.
+///
+/// *Lists*
+///
+/// TODO
+///
+/// *Non-nullable arrays*
+///
+/// If an array is non-nullable, this is accounted for when converting the Arrow
+/// schema to a Parquet schema.
+/// When dealing with `<batch<primitive[_]>>` there is no issue, as the meximum
+/// level will always be = 1.
+///
+/// When dealing with nested types, the logic becomes a bit complicate.
+/// A non-nullable struct; `<batch<struct{non-null}[a]<primitive[b]>>>` will only
+/// have 1 maximum level, where 0 means `b` is nul, and 1 means `b` is not null.
+///
+/// We account for the above by checking if the `Field` is nullable, and adjusting
+/// the [inc|dec]rement accordingly.
+fn calculate_array_levels(

Review comment:
       FWIW this seems like it might (eventually) be more naturally part `LevelInfo` (rather than taking a `level_info` parameter)
   
   Definitely something to do in a follow on PR thought

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -608,16 +631,31 @@ fn get_levels(
 /// In the case where the array in question is a child of either a list or struct, the levels
 /// are incremented in accordance with the `level` parameter.
 /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+///
+/// TODO: (a comment to remove, note to help me reduce the mental bookkeeping)
+/// We want an array's levels to be additive here, i.e. if we have an array that
+/// comes from <batch<primitive>>, we should consume &[0; array.len()], so that
+/// we add values to it, instead of subtract values
+///
+/// An alternaitve is to pass the max level, and use it to compute whether we
+/// should increment (though this is likely tricker)
 fn get_primitive_def_levels(
     array: &arrow_array::ArrayRef,
+    field: &Field,
     parent_def_levels: &[i16],
 ) -> Vec<i16> {
     let mut array_index = 0;
     let max_def_level = parent_def_levels.iter().max().unwrap();
     let mut primitive_def_levels = vec![];
     parent_def_levels.iter().for_each(|def_level| {
-        if def_level < max_def_level {
+        // TODO: if field is non-nullable, can its parent be nullable? Ideally shouldn't

Review comment:
       I wonder if this comment was ever resolved to your satisfaction?

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -521,9 +459,15 @@ fn get_levels(
             // if datatype is a primitive, we can construct levels of the child array
             match child_array.data_type() {
                 // TODO: The behaviour of a <list<null>> is untested
-                ArrowDataType::Null => vec![Levels {
+                ArrowDataType::Null => vec![LevelInfo {
                     definition: list_def_levels,
                     repetition: Some(list_rep_levels),
+                    definition_mask: level_info.definition_mask.clone(), // TODO: list mask

Review comment:
       is a runtime error a better behavior here than a `TODO` comment, just to warn potential users of the 'not yet implemented' status?

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -608,16 +631,31 @@ fn get_levels(
 /// In the case where the array in question is a child of either a list or struct, the levels
 /// are incremented in accordance with the `level` parameter.
 /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+///
+/// TODO: (a comment to remove, note to help me reduce the mental bookkeeping)

Review comment:
       is this still needed?

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -650,37 +688,62 @@ def_get_binary_array_fn!(get_large_string_array, arrow_array::LargeStringArray);
 /// Get the underlying numeric array slice, skipping any null values.
 /// If there are no null values, it might be quicker to get the slice directly instead of
 /// calling this function.
-fn get_numeric_array_slice<T, A>(array: &arrow_array::PrimitiveArray<A>) -> Vec<T::T>
+fn get_numeric_array_slice<T, A>(
+    array: &arrow_array::PrimitiveArray<A>,
+    indices: &[usize],
+) -> Vec<T::T>
 where
     T: DataType,
     A: arrow::datatypes::ArrowNumericType,
     T::T: From<A::Native>,
 {
-    let mut values = Vec::with_capacity(array.len() - array.null_count());
-    for i in 0..array.len() {
-        if array.is_valid(i) {
-            values.push(array.value(i).into())
-        }
+    let mut values = Vec::with_capacity(indices.len());
+    for i in indices {
+        values.push(array.value(*i).into())
     }
     values
 }
 
+/// Given a level's information, calculate the offsets required to index an array
+/// correctly.
+fn filter_array_indices(level: &LevelInfo) -> Vec<usize> {
+    // TODO: we don't quite get the def levels right all the time, so for now we recalculate it

Review comment:
       I don't really understand this comment, but as written it sounds somewhat concerning (aka it sounds like there is a bug in the level info calculation, and this is trying to correct / workaround it)

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -423,25 +313,64 @@ fn write_leaf(
     Ok(written as i64)
 }
 
-/// A struct that represents definition and repetition levels.
-/// Repetition levels are only populated if the parent or current leaf is repeated
-#[derive(Debug)]
-struct Levels {
-    definition: Vec<i16>,
-    repetition: Option<Vec<i16>>,
-}
-
 /// Compute nested levels of the Arrow array, recursing into lists and structs
-fn get_levels(
+/// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
+///
+/// The algorithm works by eagerly incrementing non-null values, and decrementing
+/// when a value is null.
+///
+/// *Examples:*
+///
+/// A record batch always starts at a populated definition = level 1.
+/// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
+/// can only have a maximum level of 1 if it is not null.
+/// If it is null, we decrement by 1, such that the null slots will = level 0.
+///
+/// If a batch has nested arrays (list, struct, union, etc.), then the incrementing
+/// takes place.
+/// A `<batch<struct[a]<primitive[b]>>` will have up to 2 levels (if nullable).
+/// When calculating levels for `a`, if the struct slot is not empty, we
+/// increment by 1, such that we'd have `[2, 2, 2]` if all 3 slots are not null.
+/// If there is an empty slot, we decrement, leaving us with `[2, 0, 2]` as the
+/// null slot effectively means that no record is populated for the row altogether.
+///
+/// *Lists*
+///
+/// TODO
+///
+/// *Non-nullable arrays*
+///
+/// If an array is non-nullable, this is accounted for when converting the Arrow
+/// schema to a Parquet schema.
+/// When dealing with `<batch<primitive[_]>>` there is no issue, as the meximum
+/// level will always be = 1.
+///
+/// When dealing with nested types, the logic becomes a bit complicate.

Review comment:
       ```suggestion
   /// When dealing with nested types, the logic becomes a bit complicated.
   ```

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -544,11 +488,20 @@ fn get_levels(
                 | ArrowDataType::Time64(_)
                 | ArrowDataType::Duration(_)
                 | ArrowDataType::Interval(_) => {
-                    let def_levels =
-                        get_primitive_def_levels(&child_array, &list_def_levels[..]);
-                    vec![Levels {
+                    let def_levels = get_primitive_def_levels(
+                        &child_array,
+                        list_field,
+                        &list_def_levels[..],
+                    );
+                    vec![LevelInfo {
                         definition: def_levels,
                         repetition: Some(list_rep_levels),
+                        array_mask: vec![],
+                        array_offsets: vec![],
+                        definition_mask: vec![],
+                        is_list: true,
+                        is_nullable: list_field.is_nullable(),
+                        max_definition: level + 1, // TODO: update

Review comment:
       Does this really need to be updated? It seems like the time/duration/interval types can be treated as primitives for the null calculation

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -855,13 +937,14 @@ mod tests {
     }
 
     #[test]
+    #[ignore = "list support is incomplete"]
     fn arrow_writer_complex() {
         // define schema
         let struct_field_d = Field::new("d", DataType::Float64, true);
         let struct_field_f = Field::new("f", DataType::Float32, true);
         let struct_field_g = Field::new(
             "g",
-            DataType::List(Box::new(Field::new("items", DataType::Int16, false))),

Review comment:
       FYIW the PR from @ch-sc in https://github.com/apache/arrow/pull/8715 will almost certainly conflict with this PR

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -912,23 +995,77 @@ mod tests {
 
         // build a record batch
         let batch = RecordBatch::try_new(
-            Arc::new(schema.clone()),
+            Arc::new(schema),
             vec![Arc::new(a), Arc::new(b), Arc::new(c)],
         )
         .unwrap();
 
-        let props = WriterProperties::builder()
-            .set_key_value_metadata(Some(vec![KeyValue {
-                key: "test_key".to_string(),
-                value: Some("test_value".to_string()),
-            }]))
+        roundtrip("test_arrow_writer_complex.parquet", batch);
+    }
+
+    #[test]
+    fn arrow_writer_2_level_struct() {
+        // tests writing <struct<struct<primitive>>
+        let field_c = Field::new("c", DataType::Int32, true);
+        let field_b = Field::new("b", DataType::Struct(vec![field_c]), true);
+        let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), true);
+        let schema = Schema::new(vec![field_a.clone()]);
+
+        // create data
+        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
+        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
+            .len(6)
+            .null_bit_buffer(Buffer::from(vec![0b00100111]))
+            .add_child_data(c.data())
+            .build();
+        let b = StructArray::from(b_data);
+        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
+            .len(6)
+            .null_bit_buffer(Buffer::from(vec![0b00101111]))
+            .add_child_data(b.data())
             .build();
+        let a = StructArray::from(a_data);
 
-        let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
-        let mut writer =
-            ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
+        assert_eq!(a.null_count(), 1);
+        assert_eq!(a.column(0).null_count(), 2);
+
+        // build a racord batch
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+
+        roundtrip("test_arrow_writer_2_level_struct.parquet", batch);
+    }
+
+    #[test]
+    #[ignore = "waiting on inheritance of nested structs, ARROW-10684"]
+    fn arrow_writer_2_level_struct_non_null() {

Review comment:
       is it also worth testing with all fields non-nullable?

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -757,6 +820,25 @@ mod tests {
 
     #[test]
     #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"]

Review comment:
       this `ignore` probably should be removed in this PR

##########
File path: rust/parquet/src/arrow/levels.rs
##########
@@ -0,0 +1,692 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains the logic for computing definition and repetition levels
+
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+    /// Array's definition levels
+    pub definition: Vec<i16>,
+    /// Array's optional repetition levels
+    pub repetition: Option<Vec<i16>>,
+    /// Definition mask, to indicate null ListArray slots that should be skipped
+    pub definition_mask: Vec<(bool, i16)>,

Review comment:
       I don't understand how `definition` and `definition_mask` differ...

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -176,87 +194,17 @@ fn write_leaves(
             }
             Ok(())
         }
-        ArrowDataType::Dictionary(key_type, value_type) => {
-            use arrow_array::{PrimitiveArray, StringArray};
-            use ArrowDataType::*;
-            use ColumnWriter::*;
+        ArrowDataType::Dictionary(_, value_type) => {
+            // cast dictionary to a primitive
+            let array = arrow::compute::cast(array, value_type)?;

Review comment:
       We don't yet have any dictionary array benchmarks in IOx yet (as we haven't yet hooked up the array writer -- that is planned 🔜 ). 
   
   I defer to @carols10cents  on the intent of the dictionary code here thought, as she did all the work.

##########
File path: rust/parquet/src/arrow/levels.rs
##########
@@ -0,0 +1,692 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains the logic for computing definition and repetition levels
+
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+    /// Array's definition levels
+    pub definition: Vec<i16>,
+    /// Array's optional repetition levels
+    pub repetition: Option<Vec<i16>>,
+    /// Definition mask, to indicate null ListArray slots that should be skipped
+    pub definition_mask: Vec<(bool, i16)>,
+    /// Array's offsets, 64-bit is used to accommodate large offset arrays
+    pub array_offsets: Vec<i64>,
+    /// Array's validity mask
+    pub array_mask: Vec<bool>,
+    /// The maximum definition at this level, 0 at the root (record batch) [TODO: the 0 might be inaccurate]
+    pub max_definition: i16,
+    /// Whether this array or any of its parents is a list
+    pub is_list: bool,
+    /// Whether the array is nullable (affects definition levels)
+    pub is_nullable: bool,
+}
+
+impl LevelInfo {
+    fn calculate_child_levels(
+        &self,
+        array_offsets: Vec<i64>,
+        array_mask: Vec<bool>,
+        is_list: bool,
+        is_nullable: bool,
+        current_def_level: i16,
+    ) -> Self {
+        let mut definition = vec![];
+        let mut repetition = vec![];
+        let mut definition_mask = vec![];
+        let has_repetition = self.is_list || is_list;
+
+        // keep track of parent definition nulls seen through the definition_mask
+        let mut nulls_seen = 0;
+
+        // push any initial array slots that are null
+        while !self.definition_mask[nulls_seen].0
+            && self.definition_mask[nulls_seen].1 + 2 < current_def_level
+        {
+            definition_mask.push(self.definition_mask[nulls_seen]);
+            definition.push(self.definition[nulls_seen]);
+            repetition.push(0); // TODO is it always 0?
+            nulls_seen += 1;
+            println!("Definition length e: {}", definition.len());
+        }
+
+        // we use this index to determine if a repetition should be populated based
+        // on its definition at the index. It needs to be outside of the loop
+        let mut def_index = 0;
+
+        self.array_offsets.windows(2).for_each(|w| {
+        // the parent's index allows us to iterate through its offsets and the child's
+        let from = w[0] as usize;
+        let to = w[1] as usize;
+        // dbg!((from, to));
+        // if the parent slot is empty, fill it once to show the nullness
+        if from == to {
+            definition.push(self.max_definition - 1);
+            repetition.push(0);
+            definition_mask.push((false, self.max_definition - 1));
+            println!("Definition length d: {}", definition.len());
+        }
+
+        (from..to).for_each(|index| {
+            println!(
+                "Array level: {}, parent offset: {}",
+                current_def_level, index
+            );
+            let parent_mask = &self.definition_mask[index + nulls_seen];
+            // TODO: this might need to be < instead of ==, but we generate duplicates in that case
+            if !parent_mask.0 && parent_mask.1 == current_def_level {
+                println!("Parent mask c: {:?}", parent_mask);
+                nulls_seen += 1;
+                definition.push(self.max_definition);
+                repetition.push(1);
+                definition_mask.push(*parent_mask);
+                println!("Definition length c: {}", definition.len());
+            }
+            let mask = array_mask[index];
+            let array_from = array_offsets[index];
+            let array_to = array_offsets[index + 1];
+
+            let parent_def_level = &self.definition[index + nulls_seen];
+
+            // if array_len == 0, the child is null
+            let array_len = array_to - array_from;
+
+            // compute the definition level
+            // what happens if array's len is 0?
+            if array_len == 0 {
+                definition.push(self.max_definition);
+                repetition.push(0); // TODO: validate that this is 0 for deeply nested lists
+                definition_mask.push((false, current_def_level));
+                println!("Definition length b: {}", definition.len());
+            }
+            (array_from..array_to).for_each(|_| {
+                definition.push(if *parent_def_level == self.max_definition {
+                    // TODO: haven't validated this in deeply-nested lists
+                    self.max_definition + mask as i16
+                } else {
+                    *parent_def_level
+                });
+                definition_mask.push((true, current_def_level));
+                println!("Definition length a: {}", definition.len());
+            });
+
+            // 11-11-2020 (23:57GMT)
+            // we are pushing defined repetitions even if a definition is < max
+            // I had initially separated the repetition logic here so that I
+            // don't perform a `has_repetition` check on each loop.
+            // The downside's that I now need to index into `definitions` so I
+            // can check if a value is defined or not.
+
+            if has_repetition && array_len > 0 {
+                // compute the repetition level
+
+                // dbg!(&definition);
+                // dbg!(current_def_level, parent_level.max_definition);
+                // dbg!(&parent_level.repetition);
+                match &self.repetition {
+                    Some(rep) => {
+                        let parent_rep = rep[index];
+                        // TODO(11/11/2020) need correct variable to mask repetitions correctly
+                        if definition[def_index] == current_def_level {
+                            repetition.push(parent_rep);
+                            println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                            dbg!(&repetition);
+                            def_index += 1;
+                            (1..array_len).for_each(|_| {
+                                println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                                repetition.push(current_def_level); // was parent_rep + 1
+                                def_index += 1;
+                            });
+                        } else {
+                            (0..array_len).for_each(|_| {
+                                println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                                repetition.push(0); // TODO: should it be anything else?
+                                // TODO: use an append instead of pushes
+                                def_index += 1;
+                            });
+                        }
+                    }
+                    None => {
+                        println!("+ Index {} definition is {}, and repetition is 0. Current def: {}", def_index, definition[def_index], current_def_level);
+                        // if definition[def_index] == current_def_level {
+                            repetition.push(0);
+                            def_index += 1;
+                            (1..array_len).for_each(|_| {
+                                repetition.push(1); // TODO: is it always 0 and 1?
+                                def_index += 1;
+                            });
+                        // } else {
+                        //     (0..array_len).for_each(|_| {
+                        //         repetition.push(0); // TODO: should it be anything else?
+                        //                             // TODO: use an append instead of pushes
+                        //         def_index += 1;
+                        //     });
+                        // }
+                    }
+                }
+            }
+        });
+    });
+
+        let lev = LevelInfo {
+            definition,
+            repetition: if !has_repetition {
+                None
+            } else {
+                Some(repetition)
+            },
+            definition_mask,
+            array_mask,
+            array_offsets,
+            is_list: has_repetition,
+            max_definition: current_def_level,
+            is_nullable,
+        };
+
+        println!("done");
+
+        lev
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_calculate_array_levels_twitter_example() {
+        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
+        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
+        let parent_levels = LevelInfo {
+            definition: vec![0, 0],
+            repetition: None,
+            definition_mask: vec![(true, 1), (true, 1)],
+            array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
+            array_mask: vec![true, true], // both lists defined
+            max_definition: 0,            // at the root, set to 0
+            is_list: false,               // root is never list
+            is_nullable: false,           // root in example is non-nullable
+        };
+        // offset into array, each level1 has 2 values
+        let array_offsets = vec![0, 2, 4];
+        let array_mask = vec![true, true];
+
+        // calculate level1 levels
+        let levels = parent_levels.calculate_child_levels(
+            array_offsets.clone(),
+            array_mask.clone(),
+            true,
+            false,
+            1,
+        );
+        //
+        let expected_levels = LevelInfo {
+            definition: vec![1, 1, 1, 1],
+            repetition: Some(vec![0, 1, 0, 1]),
+            definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1)],
+            array_offsets,
+            array_mask,
+            max_definition: 1,
+            is_list: true,
+            is_nullable: false,
+        };
+        assert_eq!(levels, expected_levels);
+
+        // level2
+        let parent_levels = levels;
+        let array_offsets = vec![0, 3, 7, 8, 10];
+        let array_mask = vec![true, true, true, true];
+        let levels = parent_levels.calculate_child_levels(
+            array_offsets.clone(),
+            array_mask.clone(),
+            true,
+            false,
+            2,
+        );
+        let expected_levels = LevelInfo {
+            definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
+            repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+            definition_mask: vec![
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+                (true, 2),
+            ],
+            array_offsets,
+            array_mask,
+            max_definition: 2,
+            is_list: true,
+            is_nullable: false,
+        };
+        assert_eq!(&levels, &expected_levels);
+    }
+
+    #[test]
+    fn test_calculate_one_level_1() {
+        // This test calculates the levels for a non-null primitive array
+        let parent_levels = LevelInfo {
+            definition: vec![1; 10],
+            repetition: None,
+            definition_mask: vec![(true, 1); 10],
+            array_offsets: (0..=10).collect(),
+            array_mask: vec![true; 10],
+            max_definition: 0,
+            is_list: false,
+            is_nullable: false,
+        };
+        let array_offsets: Vec<i64> = (0..=10).collect();
+        let array_mask = vec![true; 10];
+
+        let levels = parent_levels.calculate_child_levels(
+            array_offsets.clone(),
+            array_mask.clone(),
+            false,
+            false,
+            1,
+        );
+        let expected_levels = LevelInfo {
+            definition: vec![1; 10],
+            repetition: None,
+            definition_mask: vec![(true, 1); 10],
+            array_offsets,
+            array_mask,
+            max_definition: 1,
+            is_list: false,
+            is_nullable: false,
+        };
+        assert_eq!(&levels, &expected_levels);
+    }
+
+    #[test]
+    #[ignore]

Review comment:
       why `#ignore`?

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -608,16 +631,31 @@ fn get_levels(
 /// In the case where the array in question is a child of either a list or struct, the levels
 /// are incremented in accordance with the `level` parameter.
 /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+///
+/// TODO: (a comment to remove, note to help me reduce the mental bookkeeping)
+/// We want an array's levels to be additive here, i.e. if we have an array that
+/// comes from <batch<primitive>>, we should consume &[0; array.len()], so that
+/// we add values to it, instead of subtract values
+///
+/// An alternaitve is to pass the max level, and use it to compute whether we
+/// should increment (though this is likely tricker)
 fn get_primitive_def_levels(
     array: &arrow_array::ArrayRef,
+    field: &Field,
     parent_def_levels: &[i16],
 ) -> Vec<i16> {
     let mut array_index = 0;
     let max_def_level = parent_def_levels.iter().max().unwrap();
     let mut primitive_def_levels = vec![];
     parent_def_levels.iter().for_each(|def_level| {
-        if def_level < max_def_level {
+        // TODO: if field is non-nullable, can its parent be nullable? Ideally shouldn't

Review comment:
       I would think that even if a field was non-nullable, its parent could indeed be nullable.

##########
File path: rust/parquet/src/arrow/levels.rs
##########
@@ -0,0 +1,692 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains the logic for computing definition and repetition levels
+
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+    /// Array's definition levels
+    pub definition: Vec<i16>,
+    /// Array's optional repetition levels
+    pub repetition: Option<Vec<i16>>,
+    /// Definition mask, to indicate null ListArray slots that should be skipped
+    pub definition_mask: Vec<(bool, i16)>,
+    /// Array's offsets, 64-bit is used to accommodate large offset arrays
+    pub array_offsets: Vec<i64>,
+    /// Array's validity mask
+    pub array_mask: Vec<bool>,
+    /// The maximum definition at this level, 0 at the root (record batch) [TODO: the 0 might be inaccurate]
+    pub max_definition: i16,
+    /// Whether this array or any of its parents is a list
+    pub is_list: bool,
+    /// Whether the array is nullable (affects definition levels)
+    pub is_nullable: bool,
+}
+
+impl LevelInfo {
+    fn calculate_child_levels(
+        &self,
+        array_offsets: Vec<i64>,
+        array_mask: Vec<bool>,
+        is_list: bool,
+        is_nullable: bool,
+        current_def_level: i16,
+    ) -> Self {
+        let mut definition = vec![];
+        let mut repetition = vec![];
+        let mut definition_mask = vec![];
+        let has_repetition = self.is_list || is_list;
+
+        // keep track of parent definition nulls seen through the definition_mask
+        let mut nulls_seen = 0;
+
+        // push any initial array slots that are null
+        while !self.definition_mask[nulls_seen].0
+            && self.definition_mask[nulls_seen].1 + 2 < current_def_level
+        {
+            definition_mask.push(self.definition_mask[nulls_seen]);
+            definition.push(self.definition[nulls_seen]);
+            repetition.push(0); // TODO is it always 0?
+            nulls_seen += 1;
+            println!("Definition length e: {}", definition.len());
+        }
+
+        // we use this index to determine if a repetition should be populated based
+        // on its definition at the index. It needs to be outside of the loop
+        let mut def_index = 0;
+
+        self.array_offsets.windows(2).for_each(|w| {
+        // the parent's index allows us to iterate through its offsets and the child's
+        let from = w[0] as usize;
+        let to = w[1] as usize;
+        // dbg!((from, to));
+        // if the parent slot is empty, fill it once to show the nullness
+        if from == to {
+            definition.push(self.max_definition - 1);
+            repetition.push(0);
+            definition_mask.push((false, self.max_definition - 1));
+            println!("Definition length d: {}", definition.len());
+        }
+
+        (from..to).for_each(|index| {
+            println!(
+                "Array level: {}, parent offset: {}",
+                current_def_level, index
+            );
+            let parent_mask = &self.definition_mask[index + nulls_seen];
+            // TODO: this might need to be < instead of ==, but we generate duplicates in that case
+            if !parent_mask.0 && parent_mask.1 == current_def_level {
+                println!("Parent mask c: {:?}", parent_mask);
+                nulls_seen += 1;
+                definition.push(self.max_definition);
+                repetition.push(1);
+                definition_mask.push(*parent_mask);
+                println!("Definition length c: {}", definition.len());
+            }
+            let mask = array_mask[index];
+            let array_from = array_offsets[index];
+            let array_to = array_offsets[index + 1];
+
+            let parent_def_level = &self.definition[index + nulls_seen];
+
+            // if array_len == 0, the child is null
+            let array_len = array_to - array_from;
+
+            // compute the definition level
+            // what happens if array's len is 0?
+            if array_len == 0 {
+                definition.push(self.max_definition);
+                repetition.push(0); // TODO: validate that this is 0 for deeply nested lists
+                definition_mask.push((false, current_def_level));
+                println!("Definition length b: {}", definition.len());
+            }
+            (array_from..array_to).for_each(|_| {
+                definition.push(if *parent_def_level == self.max_definition {
+                    // TODO: haven't validated this in deeply-nested lists
+                    self.max_definition + mask as i16
+                } else {
+                    *parent_def_level
+                });
+                definition_mask.push((true, current_def_level));
+                println!("Definition length a: {}", definition.len());
+            });
+
+            // 11-11-2020 (23:57GMT)
+            // we are pushing defined repetitions even if a definition is < max
+            // I had initially separated the repetition logic here so that I
+            // don't perform a `has_repetition` check on each loop.
+            // The downside's that I now need to index into `definitions` so I
+            // can check if a value is defined or not.
+
+            if has_repetition && array_len > 0 {
+                // compute the repetition level
+
+                // dbg!(&definition);
+                // dbg!(current_def_level, parent_level.max_definition);
+                // dbg!(&parent_level.repetition);
+                match &self.repetition {
+                    Some(rep) => {
+                        let parent_rep = rep[index];
+                        // TODO(11/11/2020) need correct variable to mask repetitions correctly
+                        if definition[def_index] == current_def_level {
+                            repetition.push(parent_rep);
+                            println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                            dbg!(&repetition);
+                            def_index += 1;
+                            (1..array_len).for_each(|_| {
+                                println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                                repetition.push(current_def_level); // was parent_rep + 1
+                                def_index += 1;
+                            });
+                        } else {
+                            (0..array_len).for_each(|_| {
+                                println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level);
+                                repetition.push(0); // TODO: should it be anything else?
+                                // TODO: use an append instead of pushes
+                                def_index += 1;
+                            });
+                        }
+                    }
+                    None => {
+                        println!("+ Index {} definition is {}, and repetition is 0. Current def: {}", def_index, definition[def_index], current_def_level);
+                        // if definition[def_index] == current_def_level {
+                            repetition.push(0);
+                            def_index += 1;
+                            (1..array_len).for_each(|_| {
+                                repetition.push(1); // TODO: is it always 0 and 1?
+                                def_index += 1;
+                            });
+                        // } else {
+                        //     (0..array_len).for_each(|_| {
+                        //         repetition.push(0); // TODO: should it be anything else?
+                        //                             // TODO: use an append instead of pushes
+                        //         def_index += 1;
+                        //     });
+                        // }
+                    }
+                }
+            }
+        });
+    });
+
+        let lev = LevelInfo {
+            definition,
+            repetition: if !has_repetition {
+                None
+            } else {
+                Some(repetition)
+            },
+            definition_mask,
+            array_mask,
+            array_offsets,
+            is_list: has_repetition,
+            max_definition: current_def_level,
+            is_nullable,
+        };
+
+        println!("done");
+
+        lev
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_calculate_array_levels_twitter_example() {
+        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org