You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/05/11 05:42:49 UTC
[arrow-rs] branch master updated: Fix null struct and list
roundtrip (#270)
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8226219 Fix null struct and list roundtrip (#270)
8226219 is described below
commit 8226219fe7104f6c8a2740806f96f02c960d991c
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Tue May 11 07:42:41 2021 +0200
Fix null struct and list roundtrip (#270)
* fix null struct and list inconsistencies in writer
* fix list reader null and empty slot calculation
* remove stray TODOs
---
parquet/src/arrow/array_reader.rs | 95 ++++-----
parquet/src/arrow/arrow_writer.rs | 54 ++---
parquet/src/arrow/levels.rs | 430 +++++++++++++++++---------------------
3 files changed, 265 insertions(+), 314 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f209b8b..f54e446 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -615,6 +615,8 @@ pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
item_type: ArrowType,
list_def_level: i16,
list_rep_level: i16,
+ list_empty_def_level: i16,
+ list_null_def_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
_marker: PhantomData<OffsetSize>,
@@ -628,6 +630,8 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
item_type: ArrowType,
def_level: i16,
rep_level: i16,
+ list_null_def_level: i16,
+ list_empty_def_level: i16,
) -> Self {
Self {
item_reader,
@@ -635,6 +639,8 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
item_type,
list_def_level: def_level,
list_rep_level: rep_level,
+ list_null_def_level,
+ list_empty_def_level,
def_level_buffer: None,
rep_level_buffer: None,
_marker: PhantomData,
@@ -843,61 +849,49 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
// Where n is the max definition level of the list's parent.
// If a Parquet schema's only leaf is the list, then n = 0.
- // TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3
- let list_field_type = match self.get_data_type() {
- ArrowType::List(field)
- | ArrowType::FixedSizeList(field, _)
- | ArrowType::LargeList(field) => field,
- _ => {
- // Panic: this is safe as we only write lists from list datatypes
- unreachable!()
- }
- };
- let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 };
- let max_list_definition = *(def_levels.iter().max().unwrap());
- // TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists
- // debug_assert!(
- // max_list_definition >= max_list_def_range,
- // "Lift definition max less than range"
- // );
- let list_null_def = max_list_definition - max_list_def_range;
- let list_empty_def = max_list_definition - 1;
- let mut null_list_indices: Vec<usize> = Vec::new();
- for i in 0..def_levels.len() {
- if def_levels[i] == list_null_def {
- null_list_indices.push(i);
- }
- }
+ // If the list index is at empty definition, the child slot is null
+ let null_list_indices: Vec<usize> = def_levels
+ .iter()
+ .enumerate()
+ .filter_map(|(index, def)| {
+ if *def <= self.list_empty_def_level {
+ Some(index)
+ } else {
+ None
+ }
+ })
+ .collect();
let batch_values = match null_list_indices.len() {
0 => next_batch_array.clone(),
_ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?,
};
- // null list has def_level = 0
- // empty list has def_level = 1
- // null item in a list has def_level = 2
- // non-null item has def_level = 3
// first item in each list has rep_level = 0, subsequent items have rep_level = 1
-
let mut offsets: Vec<OffsetSize> = Vec::new();
let mut cur_offset = OffsetSize::zero();
- for i in 0..rep_levels.len() {
- if rep_levels[i] == 0 {
- offsets.push(cur_offset)
+ def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
+ if *r == 0 || d == &self.list_empty_def_level {
+ offsets.push(cur_offset);
}
- if def_levels[i] >= list_empty_def {
+ if d > &self.list_empty_def_level {
cur_offset += OffsetSize::one();
}
- }
+ });
offsets.push(cur_offset);
let num_bytes = bit_util::ceil(offsets.len(), 8);
- let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+ // TODO: A useful optimization is to use the null count to fill with
+ // 0 or null, to reduce individual bits set in a loop.
+ // To favour dense data, set every slot to true, then unset
+ let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let null_slice = null_buf.as_slice_mut();
let mut list_index = 0;
for i in 0..rep_levels.len() {
- if rep_levels[i] == 0 && def_levels[i] != 0 {
- bit_util::set_bit(null_slice, list_index);
+ // If the level is lower than empty, then the slot is null.
+ // When a list is non-nullable, its empty level = null level,
+ // so this automatically factors that in.
+ if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
+ bit_util::unset_bit(null_slice, list_index);
}
if rep_levels[i] == 0 {
list_index += 1;
@@ -1282,16 +1276,15 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();
new_context.path.append(vec![list_type.name().to_string()]);
+ // We need to know at what definition a list or its child is null
+ let list_null_def = new_context.def_level;
+ let mut list_empty_def = new_context.def_level;
- match list_type.get_basic_info().repetition() {
- Repetition::REPEATED => {
- new_context.def_level += 1;
- new_context.rep_level += 1;
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- }
- _ => (),
+ // If the list's root is nullable
+ if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
+ new_context.def_level += 1;
+ // current level is nullable, increment to get level for empty list slot
+ list_empty_def += 1;
}
match list_child.get_basic_info().repetition() {
@@ -1350,6 +1343,8 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_reader_type,
new_context.def_level,
new_context.rep_level,
+ list_null_def,
+ list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
@@ -1357,6 +1352,8 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_reader_type,
new_context.def_level,
new_context.rep_level,
+ list_null_def,
+ list_empty_def,
)),
_ => {
@@ -2468,6 +2465,8 @@ mod tests {
ArrowType::Int32,
1,
1,
+ 0,
+ 1,
);
let next_batch = list_array_reader.next_batch(1024).unwrap();
@@ -2522,6 +2521,8 @@ mod tests {
ArrowType::Int32,
1,
1,
+ 0,
+ 1,
);
let next_batch = list_array_reader.next_batch(1024).unwrap();
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index df6ce98..be278ed 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -91,7 +91,7 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
let batch_level = LevelInfo::new_from_batch(batch);
let mut row_group_writer = self.writer.next_row_group()?;
for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
- let mut levels = batch_level.calculate_array_levels(array, field, false);
+ let mut levels = batch_level.calculate_array_levels(array, field);
// Reverse levels as we pop() them when writing arrays
levels.reverse();
write_leaves(&mut row_group_writer, array, &mut levels)?;
@@ -793,25 +793,29 @@ mod tests {
let struct_field_g = Field::new(
"g",
DataType::List(Box::new(Field::new("item", DataType::Int16, true))),
+ false,
+ );
+ let struct_field_h = Field::new(
+ "h",
+ DataType::List(Box::new(Field::new("item", DataType::Int16, false))),
true,
);
let struct_field_e = Field::new(
"e",
- DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
- true,
+ DataType::Struct(vec![
+ struct_field_f.clone(),
+ struct_field_g.clone(),
+ struct_field_h.clone(),
+ ]),
+ false,
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
- // Note: when the below struct is set to non-nullable, this test fails,
- // but the output data written is correct.
- // Interestingly, pyarrow will read it correctly, but pyspark fails to.
- // This might be a compatibility quirk between arrow and parquet.
- // We have opened https://github.com/apache/arrow-rs/issues/245 to investigate
Field::new(
"c",
DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
- true,
+ false,
),
]);
@@ -831,15 +835,23 @@ mod tests {
// Construct a list array from the above two
let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
.len(5)
- .add_buffer(g_value_offsets)
+ .add_buffer(g_value_offsets.clone())
.add_child_data(g_value.data().clone())
- // .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues
.build();
let g = ListArray::from(g_list_data);
+ // The difference between g and h is that h has a null bitmap
+ let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
+ .len(5)
+ .add_buffer(g_value_offsets)
+ .add_child_data(g_value.data().clone())
+ .null_bit_buffer(Buffer::from(vec![0b00011011]))
+ .build();
+ let h = ListArray::from(h_list_data);
let e = StructArray::from(vec![
(struct_field_f, Arc::new(f) as ArrayRef),
(struct_field_g, Arc::new(g) as ArrayRef),
+ (struct_field_h, Arc::new(h) as ArrayRef),
]);
let c = StructArray::from(vec![
@@ -860,14 +872,10 @@ mod tests {
#[test]
fn arrow_writer_complex_mixed() {
// This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
- // Only writing the "offest_field" column works when "some_nested_object" is non-null.
- // This indicates that a non-null struct should not have a null child (with null values).
- // One observation is that spark doesn't consider the parent struct's nullness,
- // and so, we should investigate the impact of always treating structs as null.
- // See https://github.com/apache/arrow-rs/issues/245.
+ // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
// define schema
- let offset_field = Field::new("offset", DataType::Int32, true);
+ let offset_field = Field::new("offset", DataType::Int32, false);
let partition_field = Field::new("partition", DataType::Int64, true);
let topic_field = Field::new("topic", DataType::Utf8, true);
let schema = Schema::new(vec![Field::new(
@@ -877,7 +885,7 @@ mod tests {
partition_field.clone(),
topic_field.clone(),
]),
- true,
+ false,
)]);
// create some data
@@ -970,14 +978,10 @@ mod tests {
let schema = Schema::new(vec![field_a.clone()]);
// create data
- // When the null buffer of the struct is created, this test fails.
- // It appears that the nullness of the struct is ignored when the
- // struct is read back.
- // See https://github.com/apache/arrow-rs/issues/245
let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
.len(6)
- // .null_bit_buffer(Buffer::from(vec![0b00100111]))
+ .null_bit_buffer(Buffer::from(vec![0b00100111]))
.add_child_data(c.data().clone())
.build();
let b = StructArray::from(b_data);
@@ -989,7 +993,7 @@ mod tests {
let a = StructArray::from(a_data);
assert_eq!(a.null_count(), 0);
- assert_eq!(a.column(0).null_count(), 0);
+ assert_eq!(a.column(0).null_count(), 2);
// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
@@ -1362,7 +1366,7 @@ mod tests {
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
- true, // TODO: why does this fail when false? Is it related to logical nulls?
+ false,
))))
.len(5)
.add_buffer(a_value_offsets)
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 2669581..be56726 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -65,10 +65,29 @@ pub(crate) struct LevelInfo {
pub array_mask: Vec<bool>,
/// The maximum definition at this level, 0 at the record batch
pub max_definition: i16,
- /// Whether this array or any of its parents is a list
- pub is_list: bool,
- /// Whether the current array is nullable (affects definition levels)
- pub is_nullable: bool,
+ /// The type of array represented by this level info
+ pub level_type: LevelType,
+}
+
+/// LevelType defines the type of level, and whether it is nullable or not
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(crate) enum LevelType {
+ Root,
+ List(bool),
+ Struct(bool),
+ Primitive(bool),
+}
+
+impl LevelType {
+ #[inline]
+ const fn level_increment(&self) -> i16 {
+ match self {
+ LevelType::Root => 0,
+ LevelType::List(is_nullable)
+ | LevelType::Struct(is_nullable)
+ | LevelType::Primitive(is_nullable) => *is_nullable as i16,
+ }
+ }
}
impl LevelInfo {
@@ -87,10 +106,7 @@ impl LevelInfo {
// all values at a batch-level are non-null
array_mask: vec![true; num_rows],
max_definition: 0,
- is_list: false,
- // a batch is treated as nullable even though it has no nulls,
- // this is required to compute nested type levels correctly
- is_nullable: false,
+ level_type: LevelType::Root,
}
}
@@ -110,7 +126,6 @@ impl LevelInfo {
&self,
array: &ArrayRef,
field: &Field,
- is_parent_struct: bool,
) -> Vec<Self> {
let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array);
match array.data_type() {
@@ -120,8 +135,8 @@ impl LevelInfo {
array_offsets: self.array_offsets.clone(),
array_mask,
max_definition: self.max_definition.max(1),
- is_list: self.is_list,
- is_nullable: true, // always nullable as all values are nulls
+ // Null type is always nullable
+ level_type: LevelType::Primitive(true),
}],
DataType::Boolean
| DataType::Int8
@@ -152,9 +167,7 @@ impl LevelInfo {
vec![self.calculate_child_levels(
array_offsets,
array_mask,
- false,
- is_parent_struct,
- field.is_nullable(),
+ LevelType::Primitive(field.is_nullable()),
)]
}
DataType::List(list_field) | DataType::LargeList(list_field) => {
@@ -162,10 +175,7 @@ impl LevelInfo {
let list_level = self.calculate_child_levels(
array_offsets,
array_mask,
- true,
- // the list could come from a struct, but its children will all be false
- is_parent_struct,
- field.is_nullable(),
+ LevelType::List(field.is_nullable()),
);
// Construct the child array of the list, and get its offset + mask
@@ -207,13 +217,11 @@ impl LevelInfo {
vec![list_level.calculate_child_levels(
child_offsets,
child_mask,
- false,
- false, // always false
- list_field.is_nullable(),
+ LevelType::Primitive(list_field.is_nullable()),
)]
}
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
- list_level.calculate_array_levels(&child_array, list_field, false)
+ list_level.calculate_array_levels(&child_array, list_field)
}
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Union(_) => unimplemented!(),
@@ -228,10 +236,7 @@ impl LevelInfo {
let struct_level = self.calculate_child_levels(
array_offsets,
array_mask,
- false,
- // struct's own parent could be a struct
- is_parent_struct,
- field.is_nullable(),
+ LevelType::Struct(field.is_nullable()),
);
let mut struct_levels = vec![];
struct_array
@@ -239,12 +244,8 @@ impl LevelInfo {
.into_iter()
.zip(struct_fields)
.for_each(|(child_array, child_field)| {
- let mut levels = struct_level.calculate_array_levels(
- child_array,
- child_field,
- // this is the only place where this is always true
- true,
- );
+ let mut levels =
+ struct_level.calculate_array_levels(child_array, child_field);
struct_levels.append(&mut levels);
});
struct_levels
@@ -258,9 +259,7 @@ impl LevelInfo {
vec![self.calculate_child_levels(
array_offsets,
array_mask,
- false,
- is_parent_struct,
- field.is_nullable(),
+ LevelType::Primitive(field.is_nullable()),
)]
}
}
@@ -315,80 +314,39 @@ impl LevelInfo {
// we use 64-bit offsets to also accommodate large arrays
array_offsets: Vec<i64>,
array_mask: Vec<bool>,
- is_list: bool,
- is_parent_struct: bool,
- is_nullable: bool,
+ level_type: LevelType,
) -> Self {
let min_len = *(array_offsets.last().unwrap()) as usize;
let mut definition = Vec::with_capacity(min_len);
let mut repetition = Vec::with_capacity(min_len);
let mut merged_array_mask = Vec::with_capacity(min_len);
- // determine the total level increment based on data types
- let max_definition = match is_list {
- false => {
- // first exception, start of a batch, and not list
- if self.max_definition == 0 {
- 1
- } else if self.is_list {
- // second exception, always increment after a list
- self.max_definition + 1
- } else if is_parent_struct && !self.is_nullable {
- // if the parent is a non-null struct, don't increment
- self.max_definition
- } else {
- self.max_definition + is_nullable as i16
- }
+ let max_definition = match (self.level_type, level_type) {
+ (LevelType::Root, LevelType::Struct(is_nullable)) => {
+ // If the struct is non-nullable, its def level doesn't increment
+ is_nullable as i16
}
- true => {
- if is_parent_struct && !self.is_nullable {
- self.max_definition + is_nullable as i16
- } else {
- self.max_definition + 1 + is_nullable as i16
- }
+ (LevelType::Root, _) => 1,
+ (_, LevelType::Root) => {
+ unreachable!("Cannot have a root as a child")
+ }
+ (LevelType::List(_), _) => {
+ self.max_definition + 1 + level_type.level_increment()
+ }
+ (LevelType::Struct(_), _) => {
+ self.max_definition + level_type.level_increment()
+ }
+ (_, LevelType::List(is_nullable)) => {
+ // if the child is a list, even if its parent is a root
+ self.max_definition + 1 + is_nullable as i16
+ }
+ (LevelType::Primitive(_), _) => {
+ unreachable!("Cannot have a primitive parent for any type")
}
};
- match (self.is_list, is_list) {
- (false, false) => {
- self.definition
- .iter()
- .zip(array_mask.into_iter().zip(&self.array_mask))
- .for_each(|(def, (child_mask, parent_mask))| {
- merged_array_mask.push(*parent_mask && child_mask);
- match (parent_mask, child_mask) {
- (true, true) => {
- definition.push(max_definition);
- }
- (true, false) => {
- // The child is only legally null if its array is nullable.
- // Thus parent's max_definition is lower
- definition.push(if *def <= self.max_definition {
- *def
- } else {
- self.max_definition
- });
- }
- // if the parent was false, retain its definitions
- (false, _) => {
- definition.push(*def);
- }
- }
- });
-
- debug_assert_eq!(definition.len(), merged_array_mask.len());
-
- Self {
- definition,
- repetition: self.repetition.clone(), // it's None
- array_offsets,
- array_mask: merged_array_mask,
- max_definition,
- is_list: false,
- is_nullable,
- }
- }
- (true, true) => {
+ match (self.level_type, level_type) {
+ (LevelType::List(_), LevelType::List(is_nullable)) => {
// parent is a list or descendant of a list, and child is a list
let reps = self.repetition.clone().unwrap();
// Calculate the 2 list hierarchy definitions in advance
@@ -466,11 +424,10 @@ impl LevelInfo {
array_offsets,
array_mask: merged_array_mask,
max_definition,
- is_list: true,
- is_nullable,
+ level_type,
}
}
- (true, false) => {
+ (LevelType::List(_), _) => {
// List and primitive (or struct).
// The list can have more values than the primitive, indicating that there
// are slots where the list is empty. We use a counter to track this behaviour.
@@ -519,11 +476,10 @@ impl LevelInfo {
array_offsets: self.array_offsets.clone(),
array_mask: merged_array_mask,
max_definition,
- is_list: true,
- is_nullable,
+ level_type,
}
}
- (false, true) => {
+ (_, LevelType::List(is_nullable)) => {
// Encountering a list for the first time.
// Calculate the 2 list hierarchy definitions in advance
@@ -545,11 +501,7 @@ impl LevelInfo {
match (parent_mask, child_len) {
(true, 0) => {
// empty slot that is valid, i.e. {"parent": {"child": [] } }
- definition.push(if child_mask {
- l2
- } else {
- self.max_definition
- });
+ definition.push(if child_mask { l3 } else { l2 });
repetition.push(0);
merged_array_mask.push(child_mask);
}
@@ -593,8 +545,44 @@ impl LevelInfo {
array_offsets,
array_mask: merged_array_mask,
max_definition,
- is_list: true,
- is_nullable,
+ level_type,
+ }
+ }
+ (_, _) => {
+ self.definition
+ .iter()
+ .zip(array_mask.into_iter().zip(&self.array_mask))
+ .for_each(|(current_def, (child_mask, parent_mask))| {
+ merged_array_mask.push(*parent_mask && child_mask);
+ match (parent_mask, child_mask) {
+ (true, true) => {
+ definition.push(max_definition);
+ }
+ (true, false) => {
+ // The child is only legally null if its array is nullable.
+ // Thus parent's max_definition is lower
+ definition.push(if *current_def <= self.max_definition {
+ *current_def
+ } else {
+ self.max_definition
+ });
+ }
+ // if the parent was false, retain its definitions
+ (false, _) => {
+ definition.push(*current_def);
+ }
+ }
+ });
+
+ debug_assert_eq!(definition.len(), merged_array_mask.len());
+
+ Self {
+ definition,
+ repetition: self.repetition.clone(), // it's None
+ array_offsets,
+ array_mask: merged_array_mask,
+ max_definition,
+ level_type,
}
}
}
@@ -647,14 +635,20 @@ impl LevelInfo {
.into_iter()
.map(|v| v as i64)
.collect::<Vec<i64>>();
- let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect();
- (offsets, masks)
+ let array_mask = match array.data().null_buffer() {
+ Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
+ None => vec![true; array.len()],
+ };
+ (offsets, array_mask)
}
DataType::LargeList(_) => {
let offsets =
unsafe { array.data().buffers()[0].typed_data::<i64>() }.to_vec();
- let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect();
- (offsets, masks)
+ let array_mask = match array.data().null_buffer() {
+ Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
+ None => vec![true; array.len()],
+ };
+ (offsets, array_mask)
}
DataType::FixedSizeBinary(value_len) => {
let array_mask = match array.data().null_buffer() {
@@ -676,7 +670,14 @@ impl LevelInfo {
/// Given a level's information, calculate the offsets required to index an array correctly.
pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
// happy path if not dealing with lists
- if !self.is_list {
+ let is_nullable = match self.level_type {
+ LevelType::Primitive(is_nullable) => is_nullable,
+ _ => panic!(
+ "Cannot filter indices on a non-primitive array, found {:?}",
+ self.level_type
+ ),
+ };
+ if self.repetition.is_none() {
return self
.definition
.iter()
@@ -697,7 +698,7 @@ impl LevelInfo {
if *def == self.max_definition {
filtered.push(index);
}
- if *def >= self.max_definition - self.is_nullable as i16 {
+ if *def >= self.max_definition - is_nullable as i16 {
index += 1;
}
});
@@ -746,8 +747,7 @@ mod tests {
array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
array_mask: vec![true, true], // both lists defined
max_definition: 0,
- is_list: false, // root is never list
- is_nullable: false, // root in example is non-nullable
+ level_type: LevelType::Root,
};
// offset into array, each level1 has 2 values
let array_offsets = vec![0, 2, 4];
@@ -757,9 +757,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- false,
+ LevelType::List(false),
);
//
let expected_levels = LevelInfo {
@@ -768,8 +766,7 @@ mod tests {
array_offsets,
array_mask: vec![true, true, true, true],
max_definition: 1,
- is_list: true,
- is_nullable: false,
+ level_type: LevelType::List(false),
};
// the separate asserts make it easier to see what's failing
assert_eq!(&levels.definition, &expected_levels.definition);
@@ -777,8 +774,7 @@ mod tests {
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
// this assert is to help if there are more variables added to the struct
assert_eq!(&levels, &expected_levels);
@@ -789,9 +785,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- false,
+ LevelType::List(false),
);
let expected_levels = LevelInfo {
definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
@@ -799,16 +793,14 @@ mod tests {
array_offsets,
array_mask: vec![true; 10],
max_definition: 2,
- is_list: true,
- is_nullable: false,
+ level_type: LevelType::List(false),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
@@ -821,8 +813,7 @@ mod tests {
array_offsets: (0..=10).collect(),
array_mask: vec![true; 10],
max_definition: 0,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Root,
};
let array_offsets: Vec<i64> = (0..=10).collect();
let array_mask = vec![true; 10];
@@ -830,9 +821,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask.clone(),
- false,
- false,
- false,
+ LevelType::Primitive(false),
);
let expected_levels = LevelInfo {
definition: vec![1; 10],
@@ -840,8 +829,7 @@ mod tests {
array_offsets,
array_mask,
max_definition: 1,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Primitive(false),
};
assert_eq!(&levels, &expected_levels);
}
@@ -855,8 +843,7 @@ mod tests {
array_offsets: (0..=5).collect(),
array_mask: vec![true, true, true, true, true],
max_definition: 0,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Root,
};
let array_offsets: Vec<i64> = (0..=5).collect();
let array_mask = vec![true, false, true, true, false];
@@ -864,9 +851,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask.clone(),
- false,
- false,
- true,
+ LevelType::Primitive(true),
);
let expected_levels = LevelInfo {
definition: vec![1, 0, 1, 1, 0],
@@ -874,8 +859,7 @@ mod tests {
array_offsets,
array_mask,
max_definition: 1,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
assert_eq!(&levels, &expected_levels);
}
@@ -890,8 +874,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 0,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Root,
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
@@ -899,9 +882,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- true,
+ LevelType::List(true),
);
// array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
// all values are defined as we do not have nulls on the root (batch)
@@ -912,22 +893,24 @@ mod tests {
// 3: 0, 1, 1, 1
// 4: 0, 1, 1
let expected_levels = LevelInfo {
- definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
+ // The levels are normally 2 because we:
+ // - Calculate the level at the list
+ // - Calculate the level at the list's child
+ // We do not do this in these tests, thus the levels are 1 less.
+ definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
true, true, false, true, true, true, true, true, true, true, true, true,
],
- max_definition: 2,
- is_list: true,
- is_nullable: true,
+ max_definition: 1,
+ level_type: LevelType::List(true),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
@@ -952,8 +935,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![false, true, false, true, true],
max_definition: 1,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Struct(true),
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
@@ -961,9 +943,7 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- true,
+ LevelType::List(true),
);
let expected_levels = LevelInfo {
// 0 1 [2] are 0 (not defined at level 1)
@@ -971,23 +951,21 @@ mod tests {
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
- definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
+ definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
false, false, false, false, false, true, true, true, true, true, true,
true,
],
- max_definition: 3,
- is_nullable: true,
- is_list: true,
+ max_definition: 2,
+ level_type: LevelType::List(true),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
// nested lists (using previous test)
@@ -999,9 +977,7 @@ mod tests {
let levels = nested_parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- true,
+ LevelType::List(true),
);
let expected_levels = LevelInfo {
// (def: 0) 0 1 [2] are 0 (take parent)
@@ -1028,7 +1004,7 @@ mod tests {
// 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
// 4: [[116, 117], [118, 119], [120, 121]]
definition: vec![
- 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
+ 0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
],
repetition: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
@@ -1039,17 +1015,15 @@ mod tests {
true, true, true, true, true, true, true, true, true, true, true, true,
true,
],
- max_definition: 5,
- is_nullable: true,
- is_list: true,
+ max_definition: 4,
+ level_type: LevelType::List(true),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
@@ -1067,8 +1041,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4],
array_mask: vec![true, true, true, true],
max_definition: 1,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Struct(true),
};
// 0: null ([], but mask is false, so it's not just an empty list)
// 1: [1, 2, 3]
@@ -1080,29 +1053,25 @@ mod tests {
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- true,
+ LevelType::List(true),
);
// 0: [null], level 1 is defined, but not 2
// 1: [1, 2, 3]
// 2: [4, 5]
// 3: [6, 7]
let expected_levels = LevelInfo {
- definition: vec![2, 3, 3, 3, 3, 3, 3, 3],
+ definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
array_offsets,
array_mask: vec![false, true, true, true, true, true, true, true],
- max_definition: 3,
- is_list: true,
- is_nullable: true,
+ max_definition: 2,
+ level_type: LevelType::List(true),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
// nested lists (using previous test)
@@ -1121,9 +1090,7 @@ mod tests {
let levels = nested_parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
- true,
- false,
- true,
+ LevelType::List(true),
);
// We have 7 array values, and at least 15 primitives (from array_offsets)
// 0: (-)[null], parent was null, no value populated here
@@ -1137,24 +1104,22 @@ mod tests {
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
let expected_levels = LevelInfo {
- definition: vec![2, 5, 5, 5, 3, 5, 5, 5, 5, 5, 5, 5, 3, 5, 5, 5, 5, 5],
+ definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
array_mask: vec![
false, true, true, true, false, true, true, true, true, true, true, true,
true, true, true, true, true, true,
],
array_offsets,
- is_list: true,
- is_nullable: true,
- max_definition: 5,
+ max_definition: 4,
+ level_type: LevelType::List(true),
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.is_list, &expected_levels.is_list);
- assert_eq!(&levels.is_nullable, &expected_levels.is_nullable);
+ assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
@@ -1174,8 +1139,7 @@ mod tests {
array_offsets: (0..=6).collect(),
array_mask: vec![true, true, true, true, false, true],
max_definition: 1,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Struct(true),
};
// b's offset and mask
let b_offsets: Vec<i64> = (0..=6).collect();
@@ -1187,11 +1151,13 @@ mod tests {
array_offsets: (0..=6).collect(),
array_mask: vec![true, true, true, false, false, true],
max_definition: 2,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Struct(true),
};
- let b_levels =
- a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, true);
+ let b_levels = a_levels.calculate_child_levels(
+ b_offsets.clone(),
+ b_mask,
+ LevelType::Struct(true),
+ );
assert_eq!(&b_expected_levels, &b_levels);
// c's offset and mask
@@ -1204,11 +1170,10 @@ mod tests {
array_offsets: c_offsets.clone(),
array_mask: vec![true, false, true, false, false, true],
max_definition: 3,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Struct(true),
};
let c_levels =
- b_levels.calculate_child_levels(c_offsets, c_mask, false, true, true);
+ b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
assert_eq!(&c_expected_levels, &c_levels);
}
@@ -1243,8 +1208,7 @@ mod tests {
array_offsets: (0..=5).collect(),
array_mask: vec![true, true, true, true, true],
max_definition: 0,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Root,
};
let batch_level = LevelInfo::new_from_batch(&batch);
@@ -1257,8 +1221,7 @@ mod tests {
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
- let mut array_levels =
- batch_level.calculate_array_levels(array, field, false);
+ let mut array_levels = batch_level.calculate_array_levels(array, field);
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 1);
@@ -1273,16 +1236,14 @@ mod tests {
true, true, true, false, true, true, true, true, true, true, true,
],
max_definition: 3,
- is_list: true,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
assert_eq!(&list_level.definition, &expected_level.definition);
assert_eq!(&list_level.repetition, &expected_level.repetition);
assert_eq!(&list_level.array_offsets, &expected_level.array_offsets);
assert_eq!(&list_level.array_mask, &expected_level.array_mask);
assert_eq!(&list_level.max_definition, &expected_level.max_definition);
- assert_eq!(&list_level.is_list, &expected_level.is_list);
- assert_eq!(&list_level.is_nullable, &expected_level.is_nullable);
+ assert_eq!(&list_level.level_type, &expected_level.level_type);
assert_eq!(list_level, &expected_level);
}
@@ -1290,8 +1251,6 @@ mod tests {
fn mixed_struct_list() {
// this tests the level generation from the equivalent arrow_writer_complex test
- // TODO: Investigate failure if struct is null. See https://github.com/apache/arrow-rs/issues/245
-
// define schema
let struct_field_d = Field::new("d", DataType::Float64, true);
let struct_field_f = Field::new("f", DataType::Float32, true);
@@ -1360,8 +1319,7 @@ mod tests {
array_offsets: (0..=5).collect(),
array_mask: vec![true, true, true, true, true],
max_definition: 0,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Root,
};
let batch_level = LevelInfo::new_from_batch(&batch);
@@ -1374,8 +1332,7 @@ mod tests {
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
- let mut array_levels =
- batch_level.calculate_array_levels(array, field, false);
+ let mut array_levels = batch_level.calculate_array_levels(array, field);
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 5);
@@ -1389,8 +1346,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 1,
- is_list: false,
- is_nullable: false,
+ level_type: LevelType::Primitive(false),
};
assert_eq!(list_level, &expected_level);
@@ -1403,8 +1359,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, false, false, true, true],
max_definition: 1,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
assert_eq!(list_level, &expected_level);
@@ -1417,8 +1372,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![false, false, false, true, false],
max_definition: 2,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
assert_eq!(list_level, &expected_level);
@@ -1431,8 +1385,7 @@ mod tests {
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, false, true, false, true],
max_definition: 3,
- is_list: false,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
assert_eq!(list_level, &expected_level);
}
@@ -1445,8 +1398,7 @@ mod tests {
array_offsets: vec![0, 3, 3, 6],
array_mask: vec![true, true, true, false, true, true, true],
max_definition: 3,
- is_list: true,
- is_nullable: true,
+ level_type: LevelType::Primitive(true),
};
let expected = vec![0, 1, 2, 3, 4, 5];
@@ -1476,11 +1428,8 @@ mod tests {
.unwrap();
let batch_level = LevelInfo::new_from_batch(&batch);
- let struct_null_level = batch_level.calculate_array_levels(
- batch.column(0),
- batch.schema().field(0),
- false,
- );
+ let struct_null_level =
+ batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
// create second batch
// define schema
@@ -1503,11 +1452,8 @@ mod tests {
.unwrap();
let batch_level = LevelInfo::new_from_batch(&batch);
- let struct_non_null_level = batch_level.calculate_array_levels(
- batch.column(0),
- batch.schema().field(0),
- false,
- );
+ let struct_non_null_level =
+ batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
// The 2 levels should not be the same
if struct_non_null_level == struct_null_level {