You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/07/03 06:46:50 UTC
[arrow-rs] branch master updated: Fix parquet definition levels
(#511)
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 83ad35c Fix parquet definition levels (#511)
83ad35c is described below
commit 83ad35c1e022f90e17fedfc6cfe2242d62849ad5
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Sat Jul 3 08:46:40 2021 +0200
Fix parquet definition levels (#511)
* Fix parquet definition levels
- non-null primitive should have def = 0, was misinterpreting the spec
- list increments 1 if not null, or 2 if null
This fixes these issues, and updates the tests
* roundtrip tests
---
parquet/src/arrow/arrow_writer.rs | 152 ++++++++++++++++++--------------------
parquet/src/arrow/levels.rs | 94 +++++++++++------------
2 files changed, 118 insertions(+), 128 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 69ebce6..03868a3 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -563,8 +563,8 @@ fn get_fsb_array_slice(
mod tests {
use super::*;
+ use std::fs::File;
use std::sync::Arc;
- use std::{fs::File, io::Seek};
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
@@ -592,16 +592,11 @@ mod tests {
let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
// build a record batch
- let batch = RecordBatch::try_new(
- Arc::new(schema.clone()),
- vec![Arc::new(a), Arc::new(b)],
- )
- .unwrap();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+ .unwrap();
- let file = get_temp_file("test_arrow_writer.parquet", &[]);
- let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
+ roundtrip("test_arrow_write.parquet", batch, Some(SMALL_SIZE / 2));
}
#[test]
@@ -660,13 +655,13 @@ mod tests {
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
// build a record batch
- let batch =
- RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+ let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
- let file = get_temp_file("test_arrow_writer_non_null.parquet", &[]);
- let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
+ roundtrip(
+ "test_arrow_writer_non_null.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -674,8 +669,8 @@ mod tests {
// define schema
let schema = Schema::new(vec![Field::new(
"a",
- DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
- false,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
+ true,
)]);
// create some data
@@ -690,7 +685,7 @@ mod tests {
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
- true,
+ false,
))))
.len(5)
.add_buffer(a_value_offsets)
@@ -700,16 +695,52 @@ mod tests {
let a = ListArray::from(a_list_data);
// build a record batch
- let batch =
- RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+ let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
- // I think this setup is incorrect because this should pass
assert_eq!(batch.column(0).data().null_count(), 1);
- let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
- let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
+ // This test fails if the max row group size is less than the batch's length
+ // see https://github.com/apache/arrow-rs/issues/518
+ roundtrip("test_arrow_writer_list.parquet", batch, None);
+ }
+
+ #[test]
+ fn arrow_writer_list_non_null() {
+ // define schema
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
+ false,
+ )]);
+
+ // create some data
+ let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+ // Construct a buffer for value offsets, for the nested array:
+ // [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
+ let a_value_offsets =
+ arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+ // Construct a list array from the above two
+ let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
+ "item",
+ DataType::Int32,
+ false,
+ ))))
+ .len(5)
+ .add_buffer(a_value_offsets)
+ .add_child_data(a_values.data().clone())
+ .build();
+ let a = ListArray::from(a_list_data);
+
+ // build a record batch
+ let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+
+ // This test fails if the max row group size is less than the batch's length
+ // see https://github.com/apache/arrow-rs/issues/518
+ assert_eq!(batch.column(0).data().null_count(), 0);
+
+ roundtrip("test_arrow_writer_list_non_null.parquet", batch, None);
}
#[test]
@@ -733,39 +764,16 @@ mod tests {
let string_values = StringArray::from(raw_string_values.clone());
let binary_values = BinaryArray::from(raw_binary_value_refs);
let batch = RecordBatch::try_new(
- Arc::new(schema.clone()),
+ Arc::new(schema),
vec![Arc::new(string_values), Arc::new(binary_values)],
)
.unwrap();
- let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]);
- let mut writer =
- ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
- .unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
-
- file.seek(std::io::SeekFrom::Start(0)).unwrap();
- let file_reader = SerializedFileReader::new(file).unwrap();
- let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
- let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
-
- let batch = record_batch_reader.next().unwrap().unwrap();
- let string_col = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- let binary_col = batch
- .column(1)
- .as_any()
- .downcast_ref::<BinaryArray>()
- .unwrap();
-
- for i in 0..batch.num_rows() {
- assert_eq!(string_col.value(i), raw_string_values[i]);
- assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice());
- }
+ roundtrip(
+ "test_arrow_writer_binary.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -779,36 +787,16 @@ mod tests {
dec_builder.append_value(0).unwrap();
dec_builder.append_value(-100).unwrap();
- let raw_decimal_i128_values: Vec<i128> = vec![10_000, 50_000, 0, -100];
let decimal_values = dec_builder.finish();
- let batch = RecordBatch::try_new(
- Arc::new(schema.clone()),
- vec![Arc::new(decimal_values)],
- )
- .unwrap();
-
- let mut file = get_temp_file("test_arrow_writer_decimal.parquet", &[]);
- let mut writer =
- ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
+ let batch =
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)])
.unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
-
- file.seek(std::io::SeekFrom::Start(0)).unwrap();
- let file_reader = SerializedFileReader::new(file).unwrap();
- let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
- let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
- let batch = record_batch_reader.next().unwrap().unwrap();
- let decimal_col = batch
- .column(0)
- .as_any()
- .downcast_ref::<DecimalArray>()
- .unwrap();
-
- for i in 0..batch.num_rows() {
- assert_eq!(decimal_col.value(i), raw_decimal_i128_values[i]);
- }
+ roundtrip(
+ "test_arrow_writer_decimal.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 2e95039..0af0f9e 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -86,9 +86,11 @@ impl LevelType {
const fn level_increment(&self) -> i16 {
match self {
LevelType::Root => 0,
- LevelType::List(is_nullable)
- | LevelType::Struct(is_nullable)
- | LevelType::Primitive(is_nullable) => *is_nullable as i16,
+ // List repetition adds a constant 1
+ LevelType::List(is_nullable) => 1 + *is_nullable as i16,
+ LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
+ *is_nullable as i16
+ }
}
}
}
@@ -334,37 +336,27 @@ impl LevelInfo {
let mut merged_array_mask = Vec::with_capacity(min_len);
let max_definition = match (self.level_type, level_type) {
- (LevelType::Root, LevelType::Struct(is_nullable)) => {
- // If the struct is non-nullable, its def level doesn't increment
- is_nullable as i16
- }
- (LevelType::Root, _) => 1,
+ // Handle the illegal cases
(_, LevelType::Root) => {
unreachable!("Cannot have a root as a child")
}
- (LevelType::List(_), _) => {
- self.max_definition + 1 + level_type.level_increment()
- }
- (LevelType::Struct(_), _) => {
- self.max_definition + level_type.level_increment()
- }
- (_, LevelType::List(is_nullable)) => {
- // if the child is a list, even if its parent is a root
- self.max_definition + 1 + is_nullable as i16
- }
(LevelType::Primitive(_), _) => {
unreachable!("Cannot have a primitive parent for any type")
}
+ // The general case
+ (_, _) => self.max_definition + level_type.level_increment(),
};
match (self.level_type, level_type) {
(LevelType::List(_), LevelType::List(is_nullable)) => {
- // parent is a list or descendant of a list, and child is a list
+ // Parent is a list or descendant of a list, and child is a list
let reps = self.repetition.clone().unwrap();
- // Calculate the 2 list hierarchy definitions in advance
- // List is not empty, but null
- let l2 = max_definition - is_nullable as i16;
- // List is not empty, and not null
+
+ // List is null, and not empty
+ let l1 = max_definition - is_nullable as i16;
+ // List is not null, but is empty
+ let l2 = max_definition - 1;
+ // List is not null, and not empty
let l3 = max_definition;
let mut nulls_seen = 0;
@@ -399,7 +391,9 @@ impl LevelInfo {
let merged_mask = parent_mask && child_mask;
if child_len == 0 {
- definition.push(parent_def);
+ // Empty slot, i.e. {"parent": {"child": [] } }
+ // Nullness takes priority over emptiness
+ definition.push(if child_mask { l2 } else { l1 });
repetition.push(parent_rep);
merged_array_mask.push(merged_mask);
} else {
@@ -419,7 +413,7 @@ impl LevelInfo {
} else if child_mask {
l3
} else {
- l2
+ l1
});
repetition.push(rep);
merged_array_mask.push(merged_mask);
@@ -506,9 +500,11 @@ impl LevelInfo {
// Encountering a list for the first time.
// Calculate the 2 list hierarchy definitions in advance
- // List is not empty, but null (if nullable)
- let l2 = max_definition - is_nullable as i16;
- // List is not empty, and not null
+ // List is null, and not empty
+ let l1 = max_definition - 1 - is_nullable as i16;
+ // List is not null, but is empty
+ let l2 = max_definition - 1;
+ // List is not null, and not empty
let l3 = max_definition;
self.definition
@@ -523,20 +519,24 @@ impl LevelInfo {
match (parent_mask, child_len) {
(true, 0) => {
- // empty slot that is valid, i.e. {"parent": {"child": [] } }
- definition.push(if child_mask { l3 } else { l2 });
+ // Empty slot, i.e. {"parent": {"child": [] } }
+ // Nullness takes priority over emptiness
+ definition.push(if child_mask { l2 } else { l1 });
repetition.push(0);
merged_array_mask.push(child_mask);
}
(false, 0) => {
+ // Inherit the parent definition as parent was null
definition.push(*def);
repetition.push(0);
merged_array_mask.push(child_mask);
}
(true, _) => {
(child_from..child_to).for_each(|child_index| {
- definition.push(if child_mask { l3 } else { l2 });
- // mark the first child slot as 0, and the next as 1
+ // l1 and l3 make sense as list is not empty,
+ // but we reflect that it's either null or not
+ definition.push(if child_mask { l3 } else { l1 });
+ // Mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
0
} else {
@@ -547,6 +547,7 @@ impl LevelInfo {
}
(false, _) => {
(child_from..child_to).for_each(|child_index| {
+ // Inherit the parent definition as parent was null
definition.push(*def);
// mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
@@ -867,11 +868,12 @@ mod tests {
LevelType::Primitive(false),
);
let expected_levels = LevelInfo {
- definition: vec![1; 10],
+ // As it is non-null, definitions can be omitted
+ definition: vec![0; 10],
repetition: None,
array_offsets,
array_mask,
- max_definition: 1,
+ max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 10,
@@ -948,13 +950,13 @@ mod tests {
// - Calculate the level at the list
// - Calculate the level at the list's child
// We do not do this in these tests, thus the levels are 1 less.
- definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
+ definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
true, true, false, true, true, true, true, true, true, true, true, true,
],
- max_definition: 1,
+ max_definition: 2,
level_type: LevelType::List(true),
offset: 0,
length: 11, // the child has 11 slots
@@ -1006,14 +1008,14 @@ mod tests {
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
- definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
+ definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
false, false, false, false, false, true, true, true, true, true, true,
true,
],
- max_definition: 2,
+ max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 11,
@@ -1061,7 +1063,7 @@ mod tests {
// 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
// 4: [[116, 117], [118, 119], [120, 121]]
definition: vec![
- 0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
+ 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
],
repetition: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
@@ -1072,7 +1074,7 @@ mod tests {
true, true, true, true, true, true, true, true, true, true, true, true,
true,
],
- max_definition: 4,
+ max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 22,
@@ -1121,11 +1123,11 @@ mod tests {
// 2: [4, 5]
// 3: [6, 7]
let expected_levels = LevelInfo {
- definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
+ definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
array_offsets,
array_mask: vec![false, true, true, true, true, true, true, true],
- max_definition: 2,
+ max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 8,
@@ -1167,14 +1169,14 @@ mod tests {
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
let expected_levels = LevelInfo {
- definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
+ definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
array_mask: vec![
false, true, true, true, false, true, true, true, true, true, true, true,
true, true, true, true, true, true,
],
array_offsets,
- max_definition: 4,
+ max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 16,
@@ -1416,11 +1418,11 @@ mod tests {
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
- definition: vec![1, 1, 1, 1, 1],
+ definition: vec![0, 0, 0, 0, 0],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
- max_definition: 1,
+ max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 5,