You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/07/03 06:46:50 UTC

[arrow-rs] branch master updated: Fix parquet definition levels (#511)

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ad35c  Fix parquet definition levels (#511)
83ad35c is described below

commit 83ad35c1e022f90e17fedfc6cfe2242d62849ad5
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Sat Jul 3 08:46:40 2021 +0200

    Fix parquet definition levels (#511)
    
    * Fix parquet definition levels
    
    - non-null primitive should have def = 0, was misinterpreting the spec
    - list increments 1 if not null, or 2 if null
    
    This fixes these issues, and updates the tests
    
    * roundtrip tests
---
 parquet/src/arrow/arrow_writer.rs | 152 ++++++++++++++++++--------------------
 parquet/src/arrow/levels.rs       |  94 +++++++++++------------
 2 files changed, 118 insertions(+), 128 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 69ebce6..03868a3 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -563,8 +563,8 @@ fn get_fsb_array_slice(
 mod tests {
     use super::*;
 
+    use std::fs::File;
     use std::sync::Arc;
-    use std::{fs::File, io::Seek};
 
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
@@ -592,16 +592,11 @@ mod tests {
         let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
 
         // build a record batch
-        let batch = RecordBatch::try_new(
-            Arc::new(schema.clone()),
-            vec![Arc::new(a), Arc::new(b)],
-        )
-        .unwrap();
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+                .unwrap();
 
-        let file = get_temp_file("test_arrow_writer.parquet", &[]);
-        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
+        roundtrip("test_arrow_write.parquet", batch, Some(SMALL_SIZE / 2));
     }
 
     #[test]
@@ -660,13 +655,13 @@ mod tests {
         let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
 
         // build a record batch
-        let batch =
-            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
 
-        let file = get_temp_file("test_arrow_writer_non_null.parquet", &[]);
-        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
+        roundtrip(
+            "test_arrow_writer_non_null.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -674,8 +669,8 @@ mod tests {
         // define schema
         let schema = Schema::new(vec![Field::new(
             "a",
-            DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
-            false,
+            DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
+            true,
         )]);
 
         // create some data
@@ -690,7 +685,7 @@ mod tests {
         let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
             "item",
             DataType::Int32,
-            true,
+            false,
         ))))
         .len(5)
         .add_buffer(a_value_offsets)
@@ -700,16 +695,52 @@ mod tests {
         let a = ListArray::from(a_list_data);
 
         // build a record batch
-        let batch =
-            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
 
-        // I think this setup is incorrect because this should pass
         assert_eq!(batch.column(0).data().null_count(), 1);
 
-        let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
-        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
+        // This test fails if the max row group size is less than the batch's length
+        // see https://github.com/apache/arrow-rs/issues/518
+        roundtrip("test_arrow_writer_list.parquet", batch, None);
+    }
+
+    #[test]
+    fn arrow_writer_list_non_null() {
+        // define schema
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
+            false,
+        )]);
+
+        // create some data
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
+        let a_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+        // Construct a list array from the above two
+        let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
+            "item",
+            DataType::Int32,
+            false,
+        ))))
+        .len(5)
+        .add_buffer(a_value_offsets)
+        .add_child_data(a_values.data().clone())
+        .build();
+        let a = ListArray::from(a_list_data);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+
+        // This test fails if the max row group size is less than the batch's length
+        // see https://github.com/apache/arrow-rs/issues/518
+        assert_eq!(batch.column(0).data().null_count(), 0);
+
+        roundtrip("test_arrow_writer_list_non_null.parquet", batch, None);
     }
 
     #[test]
@@ -733,39 +764,16 @@ mod tests {
         let string_values = StringArray::from(raw_string_values.clone());
         let binary_values = BinaryArray::from(raw_binary_value_refs);
         let batch = RecordBatch::try_new(
-            Arc::new(schema.clone()),
+            Arc::new(schema),
             vec![Arc::new(string_values), Arc::new(binary_values)],
         )
         .unwrap();
 
-        let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]);
-        let mut writer =
-            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
-                .unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
-
-        file.seek(std::io::SeekFrom::Start(0)).unwrap();
-        let file_reader = SerializedFileReader::new(file).unwrap();
-        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
-
-        let batch = record_batch_reader.next().unwrap().unwrap();
-        let string_col = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        let binary_col = batch
-            .column(1)
-            .as_any()
-            .downcast_ref::<BinaryArray>()
-            .unwrap();
-
-        for i in 0..batch.num_rows() {
-            assert_eq!(string_col.value(i), raw_string_values[i]);
-            assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice());
-        }
+        roundtrip(
+            "test_arrow_writer_binary.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
@@ -779,36 +787,16 @@ mod tests {
         dec_builder.append_value(0).unwrap();
         dec_builder.append_value(-100).unwrap();
 
-        let raw_decimal_i128_values: Vec<i128> = vec![10_000, 50_000, 0, -100];
         let decimal_values = dec_builder.finish();
-        let batch = RecordBatch::try_new(
-            Arc::new(schema.clone()),
-            vec![Arc::new(decimal_values)],
-        )
-        .unwrap();
-
-        let mut file = get_temp_file("test_arrow_writer_decimal.parquet", &[]);
-        let mut writer =
-            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)])
                 .unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
-
-        file.seek(std::io::SeekFrom::Start(0)).unwrap();
-        let file_reader = SerializedFileReader::new(file).unwrap();
-        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
 
-        let batch = record_batch_reader.next().unwrap().unwrap();
-        let decimal_col = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<DecimalArray>()
-            .unwrap();
-
-        for i in 0..batch.num_rows() {
-            assert_eq!(decimal_col.value(i), raw_decimal_i128_values[i]);
-        }
+        roundtrip(
+            "test_arrow_writer_decimal.parquet",
+            batch,
+            Some(SMALL_SIZE / 2),
+        );
     }
 
     #[test]
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 2e95039..0af0f9e 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -86,9 +86,11 @@ impl LevelType {
     const fn level_increment(&self) -> i16 {
         match self {
             LevelType::Root => 0,
-            LevelType::List(is_nullable)
-            | LevelType::Struct(is_nullable)
-            | LevelType::Primitive(is_nullable) => *is_nullable as i16,
+            // List repetition adds a constant 1
+            LevelType::List(is_nullable) => 1 + *is_nullable as i16,
+            LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
+                *is_nullable as i16
+            }
         }
     }
 }
@@ -334,37 +336,27 @@ impl LevelInfo {
         let mut merged_array_mask = Vec::with_capacity(min_len);
 
         let max_definition = match (self.level_type, level_type) {
-            (LevelType::Root, LevelType::Struct(is_nullable)) => {
-                // If the struct is non-nullable, its def level doesn't increment
-                is_nullable as i16
-            }
-            (LevelType::Root, _) => 1,
+            // Handle the illegal cases
             (_, LevelType::Root) => {
                 unreachable!("Cannot have a root as a child")
             }
-            (LevelType::List(_), _) => {
-                self.max_definition + 1 + level_type.level_increment()
-            }
-            (LevelType::Struct(_), _) => {
-                self.max_definition + level_type.level_increment()
-            }
-            (_, LevelType::List(is_nullable)) => {
-                // if the child is a list, even if its parent is a root
-                self.max_definition + 1 + is_nullable as i16
-            }
             (LevelType::Primitive(_), _) => {
                 unreachable!("Cannot have a primitive parent for any type")
             }
+            // The general case
+            (_, _) => self.max_definition + level_type.level_increment(),
         };
 
         match (self.level_type, level_type) {
             (LevelType::List(_), LevelType::List(is_nullable)) => {
-                // parent is a list or descendant of a list, and child is a list
+                // Parent is a list or descendant of a list, and child is a list
                 let reps = self.repetition.clone().unwrap();
-                // Calculate the 2 list hierarchy definitions in advance
-                // List is not empty, but null
-                let l2 = max_definition - is_nullable as i16;
-                // List is not empty, and not null
+
+                // List is null, and not empty
+                let l1 = max_definition - is_nullable as i16;
+                // List is not null, but is empty
+                let l2 = max_definition - 1;
+                // List is not null, and not empty
                 let l3 = max_definition;
 
                 let mut nulls_seen = 0;
@@ -399,7 +391,9 @@ impl LevelInfo {
                             let merged_mask = parent_mask && child_mask;
 
                             if child_len == 0 {
-                                definition.push(parent_def);
+                                // Empty slot, i.e. {"parent": {"child": [] } }
+                                // Nullness takes priority over emptiness
+                                definition.push(if child_mask { l2 } else { l1 });
                                 repetition.push(parent_rep);
                                 merged_array_mask.push(merged_mask);
                             } else {
@@ -419,7 +413,7 @@ impl LevelInfo {
                                     } else if child_mask {
                                         l3
                                     } else {
-                                        l2
+                                        l1
                                     });
                                     repetition.push(rep);
                                     merged_array_mask.push(merged_mask);
@@ -506,9 +500,11 @@ impl LevelInfo {
                 // Encountering a list for the first time.
                 // Calculate the 2 list hierarchy definitions in advance
 
-                // List is not empty, but null (if nullable)
-                let l2 = max_definition - is_nullable as i16;
-                // List is not empty, and not null
+                // List is null, and not empty
+                let l1 = max_definition - 1 - is_nullable as i16;
+                // List is not null, but is empty
+                let l2 = max_definition - 1;
+                // List is not null, and not empty
                 let l3 = max_definition;
 
                 self.definition
@@ -523,20 +519,24 @@ impl LevelInfo {
 
                         match (parent_mask, child_len) {
                             (true, 0) => {
-                                // empty slot that is valid, i.e. {"parent": {"child": [] } }
-                                definition.push(if child_mask { l3 } else { l2 });
+                                // Empty slot, i.e. {"parent": {"child": [] } }
+                                // Nullness takes priority over emptiness
+                                definition.push(if child_mask { l2 } else { l1 });
                                 repetition.push(0);
                                 merged_array_mask.push(child_mask);
                             }
                             (false, 0) => {
+                                // Inherit the parent definition as parent was null
                                 definition.push(*def);
                                 repetition.push(0);
                                 merged_array_mask.push(child_mask);
                             }
                             (true, _) => {
                                 (child_from..child_to).for_each(|child_index| {
-                                    definition.push(if child_mask { l3 } else { l2 });
-                                    // mark the first child slot as 0, and the next as 1
+                                    // l1 and l3 make sense as list is not empty,
+                                    // but we reflect that it's either null or not
+                                    definition.push(if child_mask { l3 } else { l1 });
+                                    // Mark the first child slot as 0, and the next as 1
                                     repetition.push(if child_index == child_from {
                                         0
                                     } else {
@@ -547,6 +547,7 @@ impl LevelInfo {
                             }
                             (false, _) => {
                                 (child_from..child_to).for_each(|child_index| {
+                                    // Inherit the parent definition as parent was null
                                     definition.push(*def);
                                     // mark the first child slot as 0, and the next as 1
                                     repetition.push(if child_index == child_from {
@@ -867,11 +868,12 @@ mod tests {
             LevelType::Primitive(false),
         );
         let expected_levels = LevelInfo {
-            definition: vec![1; 10],
+            // As it is non-null, definitions can be omitted
+            definition: vec![0; 10],
             repetition: None,
             array_offsets,
             array_mask,
-            max_definition: 1,
+            max_definition: 0,
             level_type: LevelType::Primitive(false),
             offset: 0,
             length: 10,
@@ -948,13 +950,13 @@ mod tests {
             // - Calculate the level at the list
             // - Calculate the level at the list's child
             // We do not do this in these tests, thus the levels are 1 less.
-            definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
+            definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
             repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
             array_offsets,
             array_mask: vec![
                 true, true, false, true, true, true, true, true, true, true, true, true,
             ],
-            max_definition: 1,
+            max_definition: 2,
             level_type: LevelType::List(true),
             offset: 0,
             length: 11, // the child has 11 slots
@@ -1006,14 +1008,14 @@ mod tests {
             // 2 3 [4] are 0
             // 4 5 6 7 [8] are 1 (defined at level 1 only)
             // 8 9 10 [11] are 2 (defined at both levels)
-            definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
+            definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
             repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
             array_offsets,
             array_mask: vec![
                 false, false, false, false, false, true, true, true, true, true, true,
                 true,
             ],
-            max_definition: 2,
+            max_definition: 3,
             level_type: LevelType::List(true),
             offset: 0,
             length: 11,
@@ -1061,7 +1063,7 @@ mod tests {
             // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
             // 4: [[116, 117], [118, 119], [120, 121]]
             definition: vec![
-                0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
+                0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
             ],
             repetition: Some(vec![
                 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
@@ -1072,7 +1074,7 @@ mod tests {
                 true, true, true, true, true, true, true, true, true, true, true, true,
                 true,
             ],
-            max_definition: 4,
+            max_definition: 5,
             level_type: LevelType::List(true),
             offset: 0,
             length: 22,
@@ -1121,11 +1123,11 @@ mod tests {
         // 2: [4, 5]
         // 3: [6, 7]
         let expected_levels = LevelInfo {
-            definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
+            definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
             repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
             array_offsets,
             array_mask: vec![false, true, true, true, true, true, true, true],
-            max_definition: 2,
+            max_definition: 3,
             level_type: LevelType::List(true),
             offset: 0,
             length: 8,
@@ -1167,14 +1169,14 @@ mod tests {
         // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
         // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
         let expected_levels = LevelInfo {
-            definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
+            definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
             repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
             array_mask: vec![
                 false, true, true, true, false, true, true, true, true, true, true, true,
                 true, true, true, true, true, true,
             ],
             array_offsets,
-            max_definition: 4,
+            max_definition: 5,
             level_type: LevelType::List(true),
             offset: 0,
             length: 16,
@@ -1416,11 +1418,11 @@ mod tests {
         let list_level = levels.get(0).unwrap();
 
         let expected_level = LevelInfo {
-            definition: vec![1, 1, 1, 1, 1],
+            definition: vec![0, 0, 0, 0, 0],
             repetition: None,
             array_offsets: vec![0, 1, 2, 3, 4, 5],
             array_mask: vec![true, true, true, true, true],
-            max_definition: 1,
+            max_definition: 0,
             level_type: LevelType::Primitive(false),
             offset: 0,
             length: 5,