You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/27 09:19:54 UTC
[arrow-rs] branch master updated: Support writing nested lists to parquet (#1746)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8e1666a82 Support writing nested lists to parquet (#1746)
8e1666a82 is described below
commit 8e1666a8206f2eea4dd4e55c9365859c6a32a3f0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri May 27 10:19:48 2022 +0100
Support writing nested lists to parquet (#1746)
* Support writing arbitrarily nested arrow arrays (#1744)
* More tests
* Port more tests
* More tests
* Review feedback
* Reduce test churn
* Port remaining tests
* Review feedback
* Fix clippy
---
parquet/src/arrow/arrow_writer.rs | 90 +-
parquet/src/arrow/levels.rs | 2125 +++++++++++++++----------------------
2 files changed, 897 insertions(+), 1318 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index a5162045b..530dfe2ad 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -33,6 +33,7 @@ use super::schema::{
decimal_length_from_precision,
};
+use crate::arrow::levels::calculate_array_levels;
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::properties::WriterProperties;
@@ -173,16 +174,15 @@ impl<W: Write> ArrowWriter<W> {
}
}
- let mut levels: Vec<_> = arrays
+ let mut levels = arrays
.iter()
.map(|array| {
- let batch_level = LevelInfo::new(0, array.len());
- let mut levels = batch_level.calculate_array_levels(array, field);
+ let mut levels = calculate_array_levels(array, field)?;
// Reverse levels as we pop() them when writing arrays
levels.reverse();
- levels
+ Ok(levels)
})
- .collect();
+ .collect::<Result<Vec<_>>>()?;
write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
}
@@ -341,26 +341,23 @@ fn write_leaf(
column: &ArrayRef,
levels: LevelInfo,
) -> Result<i64> {
- let indices = levels.filter_array_indices();
- // Slice array according to computed offset and length
- let column = column.slice(levels.offset, levels.length);
+ let indices = levels.non_null_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
- let array =
- arrow::compute::cast(&column, &ArrowDataType::Date32)?;
+ let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
- arrow::compute::cast(&column, &ArrowDataType::Int32)?
+ arrow::compute::cast(column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
- get_numeric_array_slice::<Int32Type, _>(array, &indices)
+ get_numeric_array_slice::<Int32Type, _>(array, indices)
}
ArrowDataType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
@@ -373,21 +370,21 @@ fn write_leaf(
array,
|x| x as i32,
);
- get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+ get_numeric_array_slice::<Int32Type, _>(&array, indices)
}
_ => {
- let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?;
+ let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
- get_numeric_array_slice::<Int32Type, _>(array, &indices)
+ get_numeric_array_slice::<Int32Type, _>(array, indices)
}
};
typed.write_batch(
values.as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ColumnWriter::BoolColumnWriter(ref mut typed) => {
@@ -396,9 +393,9 @@ fn write_leaf(
.downcast_ref::<arrow_array::BooleanArray>()
.expect("Unable to get boolean array");
typed.write_batch(
- get_bool_array_slice(array, &indices).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ get_bool_array_slice(array, indices).as_slice(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
@@ -408,7 +405,7 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
- get_numeric_array_slice::<Int64Type, _>(array, &indices)
+ get_numeric_array_slice::<Int64Type, _>(array, indices)
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
@@ -421,21 +418,21 @@ fn write_leaf(
array,
|x| x as i64,
);
- get_numeric_array_slice::<Int64Type, _>(&array, &indices)
+ get_numeric_array_slice::<Int64Type, _>(&array, indices)
}
_ => {
- let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?;
+ let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
- get_numeric_array_slice::<Int64Type, _>(array, &indices)
+ get_numeric_array_slice::<Int64Type, _>(array, indices)
}
};
typed.write_batch(
values.as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
@@ -447,9 +444,9 @@ fn write_leaf(
.downcast_ref::<arrow_array::Float32Array>()
.expect("Unable to get Float32 array");
typed.write_batch(
- get_numeric_array_slice::<FloatType, _>(array, &indices).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ get_numeric_array_slice::<FloatType, _>(array, indices).as_slice(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ColumnWriter::DoubleColumnWriter(ref mut typed) => {
@@ -458,9 +455,9 @@ fn write_leaf(
.downcast_ref::<arrow_array::Float64Array>()
.expect("Unable to get Float64 array");
typed.write_batch(
- get_numeric_array_slice::<DoubleType, _>(array, &indices).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ get_numeric_array_slice::<DoubleType, _>(array, indices).as_slice(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
@@ -471,8 +468,8 @@ fn write_leaf(
.expect("Unable to get BinaryArray array");
typed.write_batch(
get_binary_array(array).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ArrowDataType::Utf8 => {
@@ -482,8 +479,8 @@ fn write_leaf(
.expect("Unable to get LargeBinaryArray array");
typed.write_batch(
get_string_array(array).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ArrowDataType::LargeBinary => {
@@ -493,8 +490,8 @@ fn write_leaf(
.expect("Unable to get LargeBinaryArray array");
typed.write_batch(
get_large_binary_array(array).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
ArrowDataType::LargeUtf8 => {
@@ -504,8 +501,8 @@ fn write_leaf(
.expect("Unable to get LargeUtf8 array");
typed.write_batch(
get_large_string_array(array).as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
_ => unreachable!("Currently unreachable because data type not supported"),
@@ -518,14 +515,14 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::IntervalYearMonthArray>()
.unwrap();
- get_interval_ym_array_slice(array, &indices)
+ get_interval_ym_array_slice(array, indices)
}
IntervalUnit::DayTime => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalDayTimeArray>()
.unwrap();
- get_interval_dt_array_slice(array, &indices)
+ get_interval_dt_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
@@ -541,14 +538,14 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap();
- get_fsb_array_slice(array, &indices)
+ get_fsb_array_slice(array, indices)
}
ArrowDataType::Decimal(_, _) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::DecimalArray>()
.unwrap();
- get_decimal_array_slice(array, &indices)
+ get_decimal_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
@@ -559,8 +556,8 @@ fn write_leaf(
};
typed.write_batch(
bytes.as_slice(),
- Some(levels.definition.as_slice()),
- levels.repetition.as_deref(),
+ levels.def_levels(),
+ levels.rep_levels(),
)?
}
};
@@ -593,6 +590,7 @@ macro_rules! def_get_binary_array_fn {
};
}
+// TODO: These methods don't handle non null indices correctly (#1753)
def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray);
def_get_binary_array_fn!(get_string_array, arrow_array::StringArray);
def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray);
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 9dcb00830..073754262 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -40,114 +40,32 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
-use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
+use crate::errors::{ParquetError, Result};
+use arrow::array::{
+ make_array, Array, ArrayData, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait,
+ StructArray,
+};
use arrow::datatypes::{DataType, Field};
-
-/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
-///
-/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track
-/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo]
-/// is created, and this is what is used to index into the array when writing data to Parquet.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub(crate) struct LevelInfo {
- /// Array's definition levels
- pub definition: Vec<i16>,
- /// Array's optional repetition levels
- pub repetition: Option<Vec<i16>>,
- /// Array's offsets, 64-bit is used to accommodate large offset arrays
- pub array_offsets: Vec<i64>,
- // TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
- /// Array's logical validity mask, whcih gets unpacked for list children.
- /// If the parent of an array is null, all children are logically treated as
- /// null. This mask keeps track of that.
- ///
- pub array_mask: Vec<bool>,
- /// The maximum definition at this level, 0 at the record batch
- pub max_definition: i16,
- /// The type of array represented by this level info
- pub level_type: LevelType,
- /// The offset of the current level's array
- pub offset: usize,
- /// The length of the current level's array
- pub length: usize,
-}
-
-/// LevelType defines the type of level, and whether it is nullable or not
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(crate) enum LevelType {
- Root,
- List(bool),
- Struct(bool),
- Primitive(bool),
-}
-
-impl LevelType {
- #[inline]
- const fn level_increment(&self) -> i16 {
- match self {
- LevelType::Root => 0,
- // List repetition adds a constant 1
- LevelType::List(is_nullable) => 1 + *is_nullable as i16,
- LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
- *is_nullable as i16
- }
- }
- }
+use std::ops::Range;
+
+/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`]
+/// for each leaf column encountered
+pub(crate) fn calculate_array_levels(
+ array: &ArrayRef,
+ field: &Field,
+) -> Result<Vec<LevelInfo>> {
+ let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
+ builder.write(array, 0..array.len());
+ Ok(builder.finish())
}
-impl LevelInfo {
- /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
- ///
- /// This is a convenience function to populate the starting point of the traversal.
- pub(crate) fn new(offset: usize, length: usize) -> Self {
- Self {
- // a batch has no definition level yet
- definition: vec![0; length],
- // a batch has no repetition as it is not a list
- repetition: None,
- // a batch has sequential offsets, should be num_rows + 1
- array_offsets: (0..=(length as i64)).collect(),
- // all values at a batch-level are non-null
- array_mask: vec![true; length],
- max_definition: 0,
- level_type: LevelType::Root,
- offset,
- length,
- }
- }
-
- /// Compute nested levels of the Arrow array, recursing into lists and structs.
- ///
- /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
- ///
- /// The parent struct's nullness is tracked, as it determines whether the child
- /// max_definition should be incremented.
- /// The 'is_parent_struct' variable asks "is this field's parent a struct?".
- /// * If we are starting at a [RecordBatch](arrow::record_batch::RecordBatch), this is `false`.
- /// * If we are calculating a list's child, this is `false`.
- /// * If we are calculating a struct (i.e. `field.data_type90 == Struct`),
- /// this depends on whether the struct is a child of a struct.
- /// * If we are calculating a field inside a [StructArray], this is 'true'.
- pub(crate) fn calculate_array_levels(
- &self,
- array: &ArrayRef,
- field: &Field,
- ) -> Vec<Self> {
- let (array_offsets, array_mask) =
- Self::get_array_offsets_and_masks(array, self.offset, self.length);
- match array.data_type() {
- DataType::Null => vec![Self {
- definition: self.definition.clone(),
- repetition: self.repetition.clone(),
- array_offsets,
- array_mask,
- max_definition: self.max_definition.max(1),
- // Null type is always nullable
- level_type: LevelType::Primitive(true),
- offset: self.offset,
- length: self.length,
- }],
- DataType::Boolean
+/// Returns true if the DataType can be represented as a primitive parquet column,
+/// i.e. a leaf array with no children
+fn is_leaf(data_type: &DataType) -> bool {
+ matches!(
+ data_type,
+ DataType::Null
+ | DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
@@ -171,650 +89,391 @@ impl LevelInfo {
| DataType::Binary
| DataType::LargeBinary
| DataType::Decimal(_, _)
- | DataType::FixedSizeBinary(_) => {
- // we return a vector of 1 value to represent the primitive
- vec![self.calculate_child_levels(
- array_offsets,
- array_mask,
- LevelType::Primitive(field.is_nullable()),
- )]
+ | DataType::FixedSizeBinary(_)
+ )
+}
+
+/// The definition and repetition level of an array within a potentially nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+ /// The current repetition level
+ rep_level: i16,
+ /// The current definition level
+ def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+ /// A primitive, leaf array
+ Primitive(LevelInfo),
+ /// A list array, contains the [`LevelInfoBuilder`] of the child and
+ /// the [`LevelContext`] of this list
+ List(Box<LevelInfoBuilder>, LevelContext),
+ /// A list array, contains the [`LevelInfoBuilder`] of its children and
+ /// the [`LevelContext`] of this struct array
+ Struct(Vec<LevelInfoBuilder>, LevelContext),
+}
+
+impl LevelInfoBuilder {
+ /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
+ fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
+ match field.data_type() {
+ d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
+ parent_ctx,
+ field.is_nullable(),
+ ))),
+ DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive(
+ LevelInfo::new(parent_ctx, field.is_nullable()),
+ )),
+ DataType::Struct(children) => {
+ let def_level = match field.is_nullable() {
+ true => parent_ctx.def_level + 1,
+ false => parent_ctx.def_level,
+ };
+
+ let ctx = LevelContext {
+ rep_level: parent_ctx.rep_level,
+ def_level,
+ };
+
+ let children = children
+ .iter()
+ .map(|f| Self::try_new(f, ctx))
+ .collect::<Result<_>>()?;
+
+ Ok(Self::Struct(children, ctx))
}
- DataType::List(list_field) | DataType::LargeList(list_field) => {
- let child_offset = array_offsets[0] as usize;
- let child_len = *array_offsets.last().unwrap() as usize;
- // Calculate the list level
- let list_level = self.calculate_child_levels(
- array_offsets,
- array_mask,
- LevelType::List(field.is_nullable()),
- );
-
- // Construct the child array of the list, and get its offset + mask
- let array_data = array.data();
- let child_data = array_data.child_data().get(0).unwrap();
- let child_array = make_array(child_data.clone());
- let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
- &child_array,
- child_offset,
- child_len - child_offset,
- );
-
- match child_array.data_type() {
- DataType::Null
- | DataType::Boolean
- | DataType::Int8
- | DataType::Int16
- | DataType::Int32
- | DataType::Int64
- | DataType::UInt8
- | DataType::UInt16
- | DataType::UInt32
- | DataType::UInt64
- | DataType::Float16
- | DataType::Float32
- | DataType::Float64
- | DataType::Timestamp(_, _)
- | DataType::Date32
- | DataType::Date64
- | DataType::Time32(_)
- | DataType::Time64(_)
- | DataType::Duration(_)
- | DataType::Interval(_)
- | DataType::Binary
- | DataType::LargeBinary
- | DataType::Utf8
- | DataType::LargeUtf8
- | DataType::Dictionary(_, _)
- | DataType::Decimal(_, _)
- | DataType::FixedSizeBinary(_) => {
- vec![list_level.calculate_child_levels(
- child_offsets,
- child_mask,
- LevelType::Primitive(list_field.is_nullable()),
- )]
- }
- DataType::List(_)
- | DataType::LargeList(_)
- | DataType::Struct(_)
- | DataType::Map(_, _) => {
- list_level.calculate_array_levels(&child_array, list_field)
- }
- DataType::FixedSizeList(_, _) => unimplemented!(),
- DataType::Union(_, _, _) => unimplemented!(),
- }
+ DataType::List(child)
+ | DataType::LargeList(child)
+ | DataType::Map(child, _) => {
+ let def_level = match field.is_nullable() {
+ true => parent_ctx.def_level + 2,
+ false => parent_ctx.def_level + 1,
+ };
+
+ let ctx = LevelContext {
+ rep_level: parent_ctx.rep_level + 1,
+ def_level,
+ };
+
+ let child = Self::try_new(child.as_ref(), ctx)?;
+ Ok(Self::List(Box::new(child), ctx))
}
- DataType::Map(map_field, _) => {
- // Calculate the map level
- let map_level = self.calculate_child_levels(
- array_offsets,
- array_mask,
- // A map is treated like a list as it has repetition
- LevelType::List(field.is_nullable()),
- );
-
- let map_array = array.as_any().downcast_ref::<MapArray>().unwrap();
-
- let key_array = map_array.keys();
- let value_array = map_array.values();
-
- if let DataType::Struct(fields) = map_field.data_type() {
- let key_field = &fields[0];
- let value_field = &fields[1];
-
- let mut map_levels = vec![];
-
- // Get key levels
- let mut key_levels =
- map_level.calculate_array_levels(&key_array, key_field);
- map_levels.append(&mut key_levels);
-
- let mut value_levels =
- map_level.calculate_array_levels(&value_array, value_field);
- map_levels.append(&mut value_levels);
-
- map_levels
- } else {
- panic!(
- "Map field should be a struct, found {:?}",
- map_field.data_type()
- );
- }
+ d => Err(nyi_err!("Datatype {} is not yet supported", d)),
+ }
+ }
+
+ /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns
+ /// as enumerated by a depth-first search
+ fn finish(self) -> Vec<LevelInfo> {
+ match self {
+ LevelInfoBuilder::Primitive(v) => vec![v],
+ LevelInfoBuilder::List(v, _) => v.finish(),
+ LevelInfoBuilder::Struct(v, _) => {
+ v.into_iter().flat_map(|l| l.finish()).collect()
}
- DataType::FixedSizeList(_, _) => unimplemented!(),
- DataType::Struct(struct_fields) => {
- let struct_array: &StructArray = array
+ }
+ }
+
+ /// Given an `array`, write the level data for the elements in `range`
+ fn write(&mut self, array: &ArrayRef, range: Range<usize>) {
+ match array.data_type() {
+ d if is_leaf(d) => self.write_leaf(array, range),
+ DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+ self.write_leaf(array, range)
+ }
+ DataType::Struct(_) => {
+ let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+ self.write_struct(array, range)
+ }
+ DataType::List(_) => {
+ let array = array
.as_any()
- .downcast_ref::<StructArray>()
- .expect("Unable to get struct array");
- let mut struct_level = self.calculate_child_levels(
- array_offsets,
- array_mask,
- LevelType::Struct(field.is_nullable()),
- );
-
- // If the parent field is a list, calculate the children of the struct as if it
- // were a list as well.
- if matches!(self.level_type, LevelType::List(_)) {
- struct_level.level_type = LevelType::List(false);
- }
+ .downcast_ref::<GenericListArray<i32>>()
+ .unwrap();
+ self.write_list(array.value_offsets(), array.data(), range)
+ }
+ DataType::LargeList(_) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<GenericListArray<i64>>()
+ .unwrap();
- let mut struct_levels = vec![];
- struct_array
- .columns()
- .into_iter()
- .zip(struct_fields)
- .for_each(|(child_array, child_field)| {
- let mut levels =
- struct_level.calculate_array_levels(child_array, child_field);
- struct_levels.append(&mut levels);
- });
- struct_levels
+ self.write_list(array.value_offsets(), array.data(), range)
}
- DataType::Union(_, _, _) => unimplemented!(),
- DataType::Dictionary(_, _) => {
- // Need to check for these cases not implemented in C++:
- // - "Writing DictionaryArray with nested dictionary type not yet supported"
- // - "Writing DictionaryArray with null encoded in dictionary type not yet supported"
- // vec![self.get_primitive_def_levels(array, field, array_mask)]
- vec![self.calculate_child_levels(
- array_offsets,
- array_mask,
- LevelType::Primitive(field.is_nullable()),
- )]
+ DataType::Map(_, _) => {
+ let array = array.as_any().downcast_ref::<MapArray>().unwrap();
+ // A Map is just as ListArray<i32> with a StructArray child, we therefore
+ // treat it as such to avoid code duplication
+ self.write_list(array.value_offsets(), array.data(), range)
}
+ _ => unreachable!(),
}
}
- /// Calculate child/leaf array levels.
- ///
- /// The algorithm works by incrementing definitions of array values based on whether:
- /// - a value is optional or required (is_nullable)
- /// - a list value is repeated + optional or required (is_list)
- ///
- /// A record batch always starts at a populated definition = level 0.
- /// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
- /// can only have a maximum level of 1 if it is not null.
- /// If it is not null, we increment by 1, such that the null slots will = level 1.
- /// The above applies to types that have no repetition (anything not a list or map).
- ///
- /// If a batch has lists, then we increment by up to 2 levels:
- /// - 1 level for the list (repeated)
- /// - 1 level if the list itself is nullable (optional)
- ///
- /// A list's child then gets incremented using the above rules.
- ///
- /// *Exceptions*
- ///
- /// There are 2 exceptions from the above rules:
- ///
- /// 1. When at the root of the schema: We always increment the
- /// level regardless of whether the child is nullable or not. If we do not do
- /// this, we could have a non-nullable array having a definition of 0.
- ///
- /// 2. List parent, non-list child: We always increment the level in this case,
- /// regardless of whether the child is nullable or not.
- ///
- /// *Examples*
+ /// Write `range` elements from ListArray `array`
///
- /// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
- /// * We don't increment the definition level as the array is not optional.
- /// * This would leave us with a definition of 0, so the first exception applies.
- /// * The definition level becomes 1.
- ///
- /// A batch with only a primitive that's nullable. `<primitive[optional]>`:
- /// * The definition level becomes 1, as we increment it once.
- ///
- /// A batch with a single non-nullable list (both list and child not null):
- /// * We calculate the level twice, for the list, and for the child.
- /// * At the list, the level becomes 1, where 0 indicates that the list is
- /// empty, and 1 says it's not (determined through offsets).
- /// * At the primitive level, the second exception applies. The level becomes 2.
- fn calculate_child_levels(
- &self,
- // we use 64-bit offsets to also accommodate large arrays
- array_offsets: Vec<i64>,
- array_mask: Vec<bool>,
- level_type: LevelType,
- ) -> Self {
- let min_len = *(array_offsets.last().unwrap()) as usize;
- let mut definition = Vec::with_capacity(min_len);
- let mut repetition = Vec::with_capacity(min_len);
- let mut merged_array_mask = Vec::with_capacity(min_len);
-
- let max_definition = match (self.level_type, level_type) {
- // Handle the illegal cases
- (_, LevelType::Root) => {
- unreachable!("Cannot have a root as a child")
- }
- (LevelType::Primitive(_), _) => {
- unreachable!("Cannot have a primitive parent for any type")
- }
- // The general case
- (_, _) => self.max_definition + level_type.level_increment(),
+ /// Note: MapArrays are ListArray<i32> under the hood and so are dispatched to this method
+ fn write_list<O: OffsetSizeTrait>(
+ &mut self,
+ offsets: &[O],
+ list_data: &ArrayData,
+ range: Range<usize>,
+ ) {
+ let (child, ctx) = match self {
+ Self::List(child, ctx) => (child, ctx),
+ _ => unreachable!(),
+ };
+
+ let offsets = &offsets[range.start..range.end + 1];
+ let child_array = make_array(list_data.child_data()[0].clone());
+
+ let write_non_null_slice =
+ |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+ child.write(&child_array, start_idx..end_idx);
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ let mut rev = rep_levels.iter_mut().rev();
+ let mut remaining = end_idx - start_idx;
+
+ loop {
+ let next = rev.next().unwrap();
+ if *next > ctx.rep_level {
+ // Nested element - ignore
+ continue;
+ }
+
+ remaining -= 1;
+ if remaining == 0 {
+ *next = ctx.rep_level - 1;
+ break;
+ }
+ }
+ })
+ };
+
+ let write_empty_slice = |child: &mut LevelInfoBuilder| {
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ rep_levels.push(ctx.rep_level - 1);
+ let def_levels = leaf.def_levels.as_mut().unwrap();
+ def_levels.push(ctx.def_level - 1);
+ })
};
- match (self.level_type, level_type) {
- (LevelType::List(_), LevelType::List(is_nullable)) => {
- // Parent is a list or descendant of a list, and child is a list
- let reps = self.repetition.clone().unwrap();
-
- // List is null, and not empty
- let l1 = max_definition - is_nullable as i16;
- // List is not null, but is empty
- let l2 = max_definition - 1;
- // List is not null, and not empty
- let l3 = max_definition;
-
- let mut nulls_seen = 0;
-
- self.array_offsets.windows(2).for_each(|w| {
- let start = w[0] as usize;
- let end = w[1] as usize;
- let parent_len = end - start;
-
- if parent_len == 0 {
- // If the parent length is 0, there won't be a slot for the child
- let index = start + nulls_seen - self.offset;
- definition.push(self.definition[index]);
- repetition.push(0);
- merged_array_mask.push(self.array_mask[index]);
- nulls_seen += 1;
+ let write_null_slice = |child: &mut LevelInfoBuilder| {
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ rep_levels.push(ctx.rep_level - 1);
+ let def_levels = leaf.def_levels.as_mut().unwrap();
+ def_levels.push(ctx.def_level - 2);
+ })
+ };
+
+ match list_data.null_bitmap() {
+ Some(nulls) => {
+ let null_offset = list_data.offset() + range.start;
+ // TODO: Faster bitmask iteration (#1757)
+ for (idx, w) in offsets.windows(2).enumerate() {
+ let is_valid = nulls.is_set(idx + null_offset);
+ let start_idx = w[0].to_usize().unwrap();
+ let end_idx = w[1].to_usize().unwrap();
+ if !is_valid {
+ write_null_slice(child)
+ } else if start_idx == end_idx {
+ write_empty_slice(child)
} else {
- (start..end).for_each(|parent_index| {
- let index = parent_index + nulls_seen - self.offset;
- let parent_index = parent_index - self.offset;
-
- // parent is either defined at this level, or earlier
- let parent_def = self.definition[index];
- let parent_rep = reps[index];
- let parent_mask = self.array_mask[index];
-
- // valid parent, index into children
- let child_start = array_offsets[parent_index] as usize;
- let child_end = array_offsets[parent_index + 1] as usize;
- let child_len = child_end - child_start;
- let child_mask = array_mask[parent_index];
- let merged_mask = parent_mask && child_mask;
-
- if child_len == 0 {
- // Empty slot, i.e. {"parent": {"child": [] } }
- // Nullness takes priority over emptiness
- definition.push(if child_mask { l2 } else { l1 });
- repetition.push(parent_rep);
- merged_array_mask.push(merged_mask);
- } else {
- (child_start..child_end).for_each(|child_index| {
- let rep = match (
- parent_index == start,
- child_index == child_start,
- ) {
- (true, true) => parent_rep,
- (true, false) => parent_rep + 2,
- (false, true) => parent_rep,
- (false, false) => parent_rep + 1,
- };
-
- definition.push(if !parent_mask {
- parent_def
- } else if child_mask {
- l3
- } else {
- l1
- });
- repetition.push(rep);
- merged_array_mask.push(merged_mask);
- });
- }
- });
+ write_non_null_slice(child, start_idx, end_idx)
}
- });
-
- debug_assert_eq!(definition.len(), merged_array_mask.len());
-
- let offset = *array_offsets.first().unwrap() as usize;
- let length = *array_offsets.last().unwrap() as usize - offset;
-
- Self {
- definition,
- repetition: Some(repetition),
- array_offsets,
- array_mask: merged_array_mask,
- max_definition,
- level_type,
- offset: offset + self.offset,
- length,
}
}
- (LevelType::List(_), _) => {
- // List and primitive (or struct).
- // The list can have more values than the primitive, indicating that there
- // are slots where the list is empty. We use a counter to track this behaviour.
- let mut nulls_seen = 0;
-
- // let child_max_definition = list_max_definition + is_nullable as i16;
- // child values are a function of parent list offsets
- let reps = self.repetition.as_deref().unwrap();
- self.array_offsets.windows(2).for_each(|w| {
- let start = w[0] as usize;
- let end = w[1] as usize;
- let parent_len = end - start;
-
- if parent_len == 0 {
- let index = start + nulls_seen - self.offset;
- definition.push(self.definition[index]);
- repetition.push(reps[index]);
- merged_array_mask.push(self.array_mask[index]);
- nulls_seen += 1;
+ None => {
+ for w in offsets.windows(2) {
+ let start_idx = w[0].to_usize().unwrap();
+ let end_idx = w[1].to_usize().unwrap();
+ if start_idx == end_idx {
+ write_empty_slice(child)
} else {
- // iterate through the array, adjusting child definitions for nulls
- (start..end).for_each(|child_index| {
- let index = child_index + nulls_seen - self.offset;
- let child_mask = array_mask[child_index - self.offset];
- let parent_mask = self.array_mask[index];
- let parent_def = self.definition[index];
-
- if !parent_mask || parent_def < self.max_definition {
- definition.push(parent_def);
- repetition.push(reps[index]);
- merged_array_mask.push(parent_mask);
- } else {
- definition.push(max_definition - !child_mask as i16);
- repetition.push(reps[index]);
- merged_array_mask.push(child_mask);
- }
- });
+ write_non_null_slice(child, start_idx, end_idx)
}
- });
-
- debug_assert_eq!(definition.len(), merged_array_mask.len());
-
- let offset = *array_offsets.first().unwrap() as usize;
- let length = *array_offsets.last().unwrap() as usize - offset;
-
- Self {
- definition,
- repetition: Some(repetition),
- array_offsets: self.array_offsets.clone(),
- array_mask: merged_array_mask,
- max_definition,
- level_type,
- offset: offset + self.offset,
- length,
}
}
- (_, LevelType::List(is_nullable)) => {
- // Encountering a list for the first time.
- // Calculate the 2 list hierarchy definitions in advance
-
- // List is null, and not empty
- let l1 = max_definition - 1 - is_nullable as i16;
- // List is not null, but is empty
- let l2 = max_definition - 1;
- // List is not null, and not empty
- let l3 = max_definition;
-
- self.definition
- .iter()
- .enumerate()
- .for_each(|(parent_index, def)| {
- let child_from = array_offsets[parent_index];
- let child_to = array_offsets[parent_index + 1];
- let child_len = child_to - child_from;
- let child_mask = array_mask[parent_index];
- let parent_mask = self.array_mask[parent_index];
-
- match (parent_mask, child_len) {
- (true, 0) => {
- // Empty slot, i.e. {"parent": {"child": [] } }
- // Nullness takes priority over emptiness
- definition.push(if child_mask { l2 } else { l1 });
- repetition.push(0);
- merged_array_mask.push(child_mask);
- }
- (false, 0) => {
- // Inherit the parent definition as parent was null
- definition.push(*def);
- repetition.push(0);
- merged_array_mask.push(child_mask);
- }
- (true, _) => {
- (child_from..child_to).for_each(|child_index| {
- // l1 and l3 make sense as list is not empty,
- // but we reflect that it's either null or not
- definition.push(if child_mask { l3 } else { l1 });
- // Mark the first child slot as 0, and the next as 1
- repetition.push(if child_index == child_from {
- 0
- } else {
- 1
- });
- merged_array_mask.push(child_mask);
- });
+ }
+ }
+
+ /// Write `range` elements from StructArray `array`
+ fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
+ let (children, ctx) = match self {
+ Self::Struct(children, ctx) => (children, ctx),
+ _ => unreachable!(),
+ };
+
+ let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+ for child in children {
+ child.visit_leaves(|info| {
+ let len = range.end - range.start;
+
+ let def_levels = info.def_levels.as_mut().unwrap();
+ def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+
+ if let Some(rep_levels) = info.rep_levels.as_mut() {
+ rep_levels.extend(std::iter::repeat(ctx.rep_level).take(len));
+ }
+ })
+ }
+ };
+
+ let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
+ for (child_array, child) in array.columns().into_iter().zip(children) {
+ child.write(child_array, range.clone())
+ }
+ };
+
+ match array.data().null_bitmap() {
+ Some(validity) => {
+ let null_offset = array.data().offset();
+ let mut last_non_null_idx = None;
+ let mut last_null_idx = None;
+
+ // TODO: Faster bitmask iteration (#1757)
+ for i in range.clone() {
+ match validity.is_set(i + null_offset) {
+ true => {
+ if let Some(last_idx) = last_null_idx.take() {
+ write_null(children, last_idx..i)
}
- (false, _) => {
- (child_from..child_to).for_each(|child_index| {
- // Inherit the parent definition as parent was null
- definition.push(*def);
- // mark the first child slot as 0, and the next as 1
- repetition.push(if child_index == child_from {
- 0
- } else {
- 1
- });
- merged_array_mask.push(false);
- });
+ last_non_null_idx.get_or_insert(i);
+ }
+ false => {
+ if let Some(last_idx) = last_non_null_idx.take() {
+ write_non_null(children, last_idx..i)
}
+ last_null_idx.get_or_insert(i);
}
- });
-
- debug_assert_eq!(definition.len(), merged_array_mask.len());
-
- let offset = *array_offsets.first().unwrap() as usize;
- let length = *array_offsets.last().unwrap() as usize - offset;
-
- Self {
- definition,
- repetition: Some(repetition),
- array_offsets,
- array_mask: merged_array_mask,
- max_definition,
- level_type,
- offset,
- length,
+ }
+ }
+
+ if let Some(last_idx) = last_null_idx.take() {
+ write_null(children, last_idx..range.end)
+ }
+
+ if let Some(last_idx) = last_non_null_idx.take() {
+ write_non_null(children, last_idx..range.end)
}
}
- (_, _) => {
- self.definition
- .iter()
- .zip(array_mask.into_iter().zip(&self.array_mask))
- .for_each(|(current_def, (child_mask, parent_mask))| {
- merged_array_mask.push(*parent_mask && child_mask);
- match (parent_mask, child_mask) {
- (true, true) => {
- definition.push(max_definition);
- }
- (true, false) => {
- // The child is only legally null if its array is nullable.
- // Thus parent's max_definition is lower
- definition.push(if *current_def <= self.max_definition {
- *current_def
- } else {
- self.max_definition
- });
- }
- // if the parent was false, retain its definitions
- (false, _) => {
- definition.push(*current_def);
+ None => write_non_null(children, range),
+ }
+ }
+
+ /// Write a primitive array, as defined by [`is_leaf`]
+ fn write_leaf(&mut self, array: &ArrayRef, range: Range<usize>) {
+ let info = match self {
+ Self::Primitive(info) => info,
+ _ => unreachable!(),
+ };
+
+ let len = range.end - range.start;
+
+ match &mut info.def_levels {
+ Some(def_levels) => {
+ def_levels.reserve(len);
+ info.non_null_indices.reserve(len);
+
+ match array.data().null_bitmap() {
+ Some(nulls) => {
+ let nulls_offset = array.data().offset();
+ // TODO: Faster bitmask iteration (#1757)
+ for i in range {
+ match nulls.is_set(i + nulls_offset) {
+ true => {
+ def_levels.push(info.max_def_level);
+ info.non_null_indices.push(i)
+ }
+ false => def_levels.push(info.max_def_level - 1),
}
}
- });
-
- debug_assert_eq!(definition.len(), merged_array_mask.len());
-
- Self {
- definition,
- repetition: self.repetition.clone(), // it's None
- array_offsets,
- array_mask: merged_array_mask,
- max_definition,
- level_type,
- // Inherit parent offset and length
- offset: self.offset,
- length: self.length,
+ }
+ None => {
+ let iter = std::iter::repeat(info.max_def_level).take(len);
+ def_levels.extend(iter);
+ info.non_null_indices.extend(range);
+ }
}
}
+ None => info.non_null_indices.extend(range),
+ }
+
+ if let Some(rep_levels) = &mut info.rep_levels {
+ rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len))
}
}
- /// Get the offsets of an array as 64-bit values, and validity masks as booleans
- /// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
- /// from validity bitmap
- /// - List array offsets will be the value offsets, masks are computed from offsets
- fn get_array_offsets_and_masks(
- array: &ArrayRef,
- offset: usize,
- len: usize,
- ) -> (Vec<i64>, Vec<bool>) {
- match array.data_type() {
- // A NullArray is entirely nulls, despite not containing a null buffer
- DataType::Null => ((0..=(len as i64)).collect(), vec![false; len]),
- DataType::Boolean
- | DataType::Int8
- | DataType::Int16
- | DataType::Int32
- | DataType::Int64
- | DataType::UInt8
- | DataType::UInt16
- | DataType::UInt32
- | DataType::UInt64
- | DataType::Float16
- | DataType::Float32
- | DataType::Float64
- | DataType::Timestamp(_, _)
- | DataType::Date32
- | DataType::Date64
- | DataType::Time32(_)
- | DataType::Time64(_)
- | DataType::Duration(_)
- | DataType::Interval(_)
- | DataType::Binary
- | DataType::LargeBinary
- | DataType::Utf8
- | DataType::LargeUtf8
- | DataType::Struct(_)
- | DataType::Dictionary(_, _)
- | DataType::Decimal(_, _) => {
- let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
- None => vec![true; len],
- };
- ((0..=(len as i64)).collect(), array_mask)
- }
- DataType::List(_) | DataType::Map(_, _) => {
- let offsets = unsafe { array.data().buffers()[0].typed_data::<i32>() };
- let offsets = offsets
- .iter()
- .copied()
- .skip(array.offset() + offset)
- .take(len + 1)
- .map(|v| v as i64)
- .collect::<Vec<i64>>();
- let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
- None => vec![true; len],
- };
- (offsets, array_mask)
- }
- DataType::LargeList(_) => {
- let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
- .iter()
- .skip(array.offset() + offset)
- .take(len + 1)
- .copied()
- .collect();
- let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
- None => vec![true; len],
- };
- (offsets, array_mask)
- }
- DataType::FixedSizeBinary(value_len) => {
- let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
- None => vec![true; len],
- };
- let value_len = *value_len as i64;
- (
- (0..=(len as i64)).map(|v| v * value_len).collect(),
- array_mask,
- )
- }
- DataType::FixedSizeList(_, _) | DataType::Union(_, _, _) => {
- unimplemented!("Getting offsets not yet implemented")
+ /// Visits all children of this node in depth first order
+ fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) {
+ match self {
+ LevelInfoBuilder::Primitive(info) => visit(info),
+ LevelInfoBuilder::List(c, _) => c.visit_leaves(visit),
+ LevelInfoBuilder::Struct(children, _) => {
+ for c in children {
+ c.visit_leaves(visit)
+ }
}
}
}
+}
+/// The data necessary to write a primitive Arrow array to parquet, taking into account
+/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+ /// Array's definition levels
+ ///
+ /// Present if `max_def_level != 0`
+ def_levels: Option<Vec<i16>>,
- /// Given a level's information, calculate the offsets required to index an array correctly.
- pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
- if !matches!(self.level_type, LevelType::Primitive(_)) {
- panic!(
- "Cannot filter indices on a non-primitive array, found {:?}",
- self.level_type
- );
- }
+ /// Array's optional repetition levels
+ ///
+ /// Present if `max_rep_level != 0`
+ rep_levels: Option<Vec<i16>>,
- // happy path if not dealing with lists
- if self.repetition.is_none() {
- return self
- .definition
- .iter()
- .enumerate()
- .filter_map(|(i, def)| {
- if *def == self.max_definition {
- Some(i)
- } else {
- None
- }
- })
- .collect();
- }
+ /// The corresponding array identifying non-null slices of data
+ /// from the primitive array
+ non_null_indices: Vec<usize>,
- let mut filtered = vec![];
- let mut definition_levels = self.definition.iter();
- let mut index = 0;
-
- for len in self.array_offsets.windows(2).map(|s| s[1] - s[0]) {
- if len == 0 {
- // Skip this definition level--the iterator should not be empty, and the definition
- // level be less than max_definition, i.e., a null value)
- assert!(*definition_levels.next().unwrap() < self.max_definition);
- } else {
- for (_, def) in (0..len).zip(&mut definition_levels) {
- if *def == self.max_definition {
- filtered.push(index);
- }
- index += 1;
- }
- }
+ /// The maximum definition level for this leaf column
+ max_def_level: i16,
+
+ /// The maximum repetition for this leaf column
+ max_rep_level: i16,
+}
+
+impl LevelInfo {
+ fn new(ctx: LevelContext, is_nullable: bool) -> Self {
+ let max_rep_level = ctx.rep_level;
+ let max_def_level = match is_nullable {
+ true => ctx.def_level + 1,
+ false => ctx.def_level,
+ };
+
+ Self {
+ def_levels: (max_def_level != 0).then(Vec::new),
+ rep_levels: (max_rep_level != 0).then(Vec::new),
+ non_null_indices: vec![],
+ max_def_level,
+ max_rep_level,
}
+ }
- filtered
+ pub fn def_levels(&self) -> Option<&[i16]> {
+ self.def_levels.as_deref()
}
-}
-/// Convert an Arrow buffer to a boolean array slice
-/// TODO: this was created for buffers, so might not work for bool array, might be slow too
-#[inline]
-fn get_bool_array_slice(
- buffer: &arrow::buffer::Buffer,
- offset: usize,
- len: usize,
-) -> Vec<bool> {
- let data = buffer.as_slice();
- (offset..(len + offset))
- .map(|i| arrow::util::bit_util::get_bit(data, i))
- .collect()
+ pub fn rep_levels(&self) -> Option<&[i16]> {
+ self.rep_levels.as_deref()
+ }
+
+ pub fn non_null_indices(&self) -> &[usize] {
+ &self.non_null_indices
+ }
}
#[cfg(test)]
@@ -825,203 +484,161 @@ mod tests {
use arrow::array::*;
use arrow::buffer::Buffer;
- use arrow::datatypes::{Schema, ToByteSlice};
+ use arrow::datatypes::{Int32Type, Schema, ToByteSlice};
use arrow::record_batch::RecordBatch;
+ use arrow::util::pretty::pretty_format_columns;
#[test]
fn test_calculate_array_levels_twitter_example() {
// based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
// [[a, b, c], [d, e, f, g]], [[h], [i,j]]
- let parent_levels = LevelInfo {
- definition: vec![0, 0],
- repetition: None,
- array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
- array_mask: vec![true, true], // both lists defined
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 2,
- };
- // offset into array, each level1 has 2 values
- let array_offsets = vec![0, 2, 4];
- let array_mask = vec![true, true];
-
- // calculate level1 levels
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(false),
- );
- //
- let expected_levels = LevelInfo {
- definition: vec![1, 1, 1, 1],
- repetition: Some(vec![0, 1, 0, 1]),
- array_offsets,
- array_mask: vec![true, true, true, true],
- max_definition: 1,
- level_type: LevelType::List(false),
- offset: 0,
- length: 4,
- };
- // the separate asserts make it easier to see what's failing
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_mask, &expected_levels.array_mask);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- // this assert is to help if there are more variables added to the struct
- assert_eq!(&levels, &expected_levels);
-
- // level2
- let parent_levels = levels;
- let array_offsets = vec![0, 3, 7, 8, 10];
- let array_mask = vec![true, true, true, true];
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(false),
- );
- let expected_levels = LevelInfo {
- definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
- repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
- array_offsets,
- array_mask: vec![true; 10],
- max_definition: 2,
- level_type: LevelType::List(false),
- offset: 0,
- length: 10,
+
+ let leaf_type = Field::new("item", DataType::Int32, false);
+ let inner_type = DataType::List(Box::new(leaf_type));
+ let inner_field = Field::new("l2", inner_type.clone(), false);
+ let outer_type = DataType::List(Box::new(inner_field));
+ let outer_field = Field::new("l1", outer_type.clone(), false);
+
+ let primitives = Int32Array::from_iter(0..10);
+
+ // Cannot use from_iter_primitive as always infers nullable
+ let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
+ let inner_list = ArrayDataBuilder::new(inner_type)
+ .len(4)
+ .add_buffer(offsets)
+ .add_child_data(primitives.data().clone())
+ .build()
+ .unwrap();
+
+ let offsets = Buffer::from_iter([0_i32, 2, 4]);
+ let outer_list = ArrayDataBuilder::new(outer_type)
+ .len(2)
+ .add_buffer(offsets)
+ .add_child_data(inner_list)
+ .build()
+ .unwrap();
+ let outer_list = make_array(outer_list);
+
+ let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
+ let expected = LevelInfo {
+ def_levels: Some(vec![2; 10]),
+ rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+ non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
+ max_def_level: 2,
+ max_rep_level: 2,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_mask, &expected_levels.array_mask);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
+ assert_eq!(&levels[0], &expected);
}
#[test]
fn test_calculate_one_level_1() {
// This test calculates the levels for a non-null primitive array
- let parent_levels = LevelInfo {
- definition: vec![0; 10],
- repetition: None,
- array_offsets: (0..=10).collect(),
- array_mask: vec![true; 10],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 10,
- };
- let array_offsets: Vec<i64> = (0..=10).collect();
- let array_mask = vec![true; 10];
+ let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
+ let field = Field::new("item", DataType::Int32, false);
+
+ let levels = calculate_array_levels(&array, &field).unwrap();
+ assert_eq!(levels.len(), 1);
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask.clone(),
- LevelType::Primitive(false),
- );
let expected_levels = LevelInfo {
- // As it is non-null, definitions can be omitted
- definition: vec![0; 10],
- repetition: None,
- array_offsets,
- array_mask,
- max_definition: 0,
- level_type: LevelType::Primitive(false),
- offset: 0,
- length: 10,
+ def_levels: None,
+ rep_levels: None,
+ non_null_indices: (0..10).collect(),
+ max_def_level: 0,
+ max_rep_level: 0,
};
- assert_eq!(&levels, &expected_levels);
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
fn test_calculate_one_level_2() {
- // This test calculates the levels for a non-null primitive array
- let parent_levels = LevelInfo {
- definition: vec![0; 5],
- repetition: None,
- array_offsets: (0..=5).collect(),
- array_mask: vec![true, true, true, true, true],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 5,
- };
- let array_offsets: Vec<i64> = (0..=5).collect();
- let array_mask = vec![true, false, true, true, false];
+ // This test calculates the levels for a nullable primitive array
+ let array = Arc::new(Int32Array::from_iter([
+ Some(0),
+ None,
+ Some(0),
+ Some(0),
+ None,
+ ])) as ArrayRef;
+ let field = Field::new("item", DataType::Int32, true);
+
+ let levels = calculate_array_levels(&array, &field).unwrap();
+ assert_eq!(levels.len(), 1);
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask.clone(),
- LevelType::Primitive(true),
- );
let expected_levels = LevelInfo {
- definition: vec![1, 0, 1, 1, 0],
- repetition: None,
- array_offsets,
- array_mask,
- max_definition: 1,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ def_levels: Some(vec![1, 0, 1, 1, 0]),
+ rep_levels: None,
+ non_null_indices: vec![0, 2, 3],
+ max_def_level: 1,
+ max_rep_level: 0,
};
- assert_eq!(&levels, &expected_levels);
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
fn test_calculate_array_levels_1() {
+ let leaf_field = Field::new("item", DataType::Int32, false);
+ let list_type = DataType::List(Box::new(leaf_field));
+
// if all array values are defined (e.g. batch<list<_>>)
// [[0], [1], [2], [3], [4]]
- let parent_levels = LevelInfo {
- definition: vec![0; 5],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![true, true, true, true, true],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 5,
+
+ let leaf_array = Int32Array::from_iter(0..5);
+ // Cannot use from_iter_primitive as always infers nullable
+ let offsets = Buffer::from_iter(0_i32..6);
+ let list = ArrayDataBuilder::new(list_type.clone())
+ .len(5)
+ .add_buffer(offsets)
+ .add_child_data(leaf_array.data().clone())
+ .build()
+ .unwrap();
+ let list = make_array(list);
+
+ let list_field = Field::new("list", list_type.clone(), false);
+ let levels = calculate_array_levels(&list, &list_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
+ let expected_levels = LevelInfo {
+ def_levels: Some(vec![1; 5]),
+ rep_levels: Some(vec![0; 5]),
+ non_null_indices: (0..5).collect(),
+ max_def_level: 1,
+ max_rep_level: 1,
};
- let array_offsets = vec![0, 2, 2, 4, 8, 11];
- let array_mask = vec![true, false, true, true, true];
+ assert_eq!(&levels[0], &expected_levels);
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(true),
- );
- // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
+ // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
// all values are defined as we do not have nulls on the root (batch)
// repetition:
// 0: 0, 1
- // 1:
+ // 1: 0
// 2: 0, 1
// 3: 0, 1, 1, 1
// 4: 0, 1, 1
+ let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
+ let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
+ let list = ArrayDataBuilder::new(list_type.clone())
+ .len(5)
+ .add_buffer(offsets)
+ .add_child_data(leaf_array.data().clone())
+ .null_bit_buffer(Some(Buffer::from([0b00011101])))
+ .build()
+ .unwrap();
+ let list = make_array(list);
+
+ let list_field = Field::new("list", list_type, true);
+ let levels = calculate_array_levels(&list, &list_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
let expected_levels = LevelInfo {
- // The levels are normally 2 because we:
- // - Calculate the level at the list
- // - Calculate the level at the list's child
- // We do not do this in these tests, thus the levels are 1 less.
- definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
- repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
- array_offsets,
- array_mask: vec![
- true, true, false, true, true, true, true, true, true, true, true, true,
- ],
- max_definition: 2,
- level_type: LevelType::List(true),
- offset: 0,
- length: 11, // the child has 11 slots
+ def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
+ rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
+ non_null_indices: (0..11).collect(),
+ max_def_level: 2,
+ max_rep_level: 1,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
@@ -1039,210 +656,200 @@ mod tests {
// If the first values of a list are null due to a parent, we have to still account for them
// while indexing, because they would affect the way the child is indexed
// i.e. in the above example, we have to know that [0, 1] has to be skipped
- let parent_levels = LevelInfo {
- definition: vec![0, 1, 0, 1, 1],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![false, true, false, true, true],
- max_definition: 1,
- level_type: LevelType::Struct(true),
- offset: 0,
- length: 5,
- };
- let array_offsets = vec![0, 2, 2, 4, 8, 11];
- let array_mask = vec![true, false, true, true, true];
+ let leaf = Int32Array::from_iter(0..11);
+ let leaf_field = Field::new("leaf", DataType::Int32, false);
+
+ let list_type = DataType::List(Box::new(leaf_field));
+ let list = ArrayData::builder(list_type.clone())
+ .len(5)
+ .add_child_data(leaf.data().clone())
+ .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
+ .build()
+ .unwrap();
+
+ let list = make_array(list);
+ let list_field = Field::new("list", list_type, true);
+
+ let struct_array =
+ StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
+ let array = Arc::new(struct_array) as ArrayRef;
+
+ let struct_field = Field::new("struct", array.data_type().clone(), true);
+
+ let levels = calculate_array_levels(&array, &struct_field).unwrap();
+ assert_eq!(levels.len(), 1);
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(true),
- );
let expected_levels = LevelInfo {
- // 0 1 [2] are 0 (not defined at level 1)
- // [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
- // 2 3 [4] are 0
- // 4 5 6 7 [8] are 1 (defined at level 1 only)
- // 8 9 10 [11] are 2 (defined at both levels)
- definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
- repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
- array_offsets,
- array_mask: vec![
- false, false, false, false, false, true, true, true, true, true, true,
- true,
- ],
- max_definition: 3,
- level_type: LevelType::List(true),
- offset: 0,
- length: 11,
+ def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
+ rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
+ non_null_indices: (4..11).collect(),
+ max_def_level: 3,
+ max_rep_level: 1,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
-
- // nested lists (using previous test)
- let nested_parent_levels = levels;
- let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22];
- let array_mask = vec![
- true, true, true, true, true, true, true, true, true, true, true,
- ];
- let levels = nested_parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(true),
- );
+
+ assert_eq!(&levels[0], &expected_levels);
+
+ // nested lists
+
+ // 0: [[100, 101], [102, 103]]
+ // 1: []
+ // 2: [[104, 105], [106, 107]]
+ // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
+ // 4: [[116, 117], [118, 119], [120, 121]]
+
+ let leaf = Int32Array::from_iter(100..122);
+ let leaf_field = Field::new("leaf", DataType::Int32, true);
+
+ let l1_type = DataType::List(Box::new(leaf_field));
+ let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
+ let l1 = ArrayData::builder(l1_type.clone())
+ .len(11)
+ .add_child_data(leaf.data().clone())
+ .add_buffer(offsets)
+ .build()
+ .unwrap();
+
+ let l1_field = Field::new("l1", l1_type, true);
+ let l2_type = DataType::List(Box::new(l1_field));
+ let l2 = ArrayData::builder(l2_type)
+ .len(5)
+ .add_child_data(l1)
+ .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
+ .build()
+ .unwrap();
+
+ let l2 = make_array(l2);
+ let l2_field = Field::new("l2", l2.data_type().clone(), true);
+
+ let levels = calculate_array_levels(&l2, &l2_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
let expected_levels = LevelInfo {
- // (def: 0) 0 1 [2] are 0 (take parent)
- // (def: 0) 2 3 [4] are 0 (take parent)
- // (def: 0) 4 5 [6] are 0 (take parent)
- // (def: 0) 6 7 [8] are 0 (take parent)
- // (def: 1) 8 9 [10] are 1 (take parent)
- // (def: 1) 10 11 [12] are 1 (take parent)
- // (def: 1) 12 23 [14] are 1 (take parent)
- // (def: 1) 14 15 [16] are 1 (take parent)
- // (def: 2) 16 17 [18] are 2 (defined at all levels)
- // (def: 2) 18 19 [20] are 2 (defined at all levels)
- // (def: 2) 20 21 [22] are 2 (defined at all levels)
- //
- // 0 1 [2] are 0 (not defined at level 1)
- // [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
- // 2 3 [4] are 0
- // 4 5 6 7 [8] are 1 (defined at level 1 only)
- // 8 9 10 [11] are 2 (defined at both levels)
- //
- // 0: [[100, 101], [102, 103]]
- // 1: []
- // 2: [[104, 105], [106, 107]]
- // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
- // 4: [[116, 117], [118, 119], [120, 121]]
- definition: vec![
- 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
- ],
- repetition: Some(vec![
+ def_levels: Some(vec![
+ 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
+ ]),
+ rep_levels: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
]),
- array_offsets,
- array_mask: vec![
- false, false, false, false, false, false, false, false, false, true,
- true, true, true, true, true, true, true, true, true, true, true, true,
- true,
- ],
- max_definition: 5,
- level_type: LevelType::List(true),
- offset: 0,
- length: 22,
+ non_null_indices: (0..22).collect(),
+ max_def_level: 5,
+ max_rep_level: 2,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.array_mask, &expected_levels.array_mask);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
+
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
fn test_calculate_array_levels_nested_list() {
+ let leaf_field = Field::new("leaf", DataType::Int32, false);
+ let list_type = DataType::List(Box::new(leaf_field));
+
// if all array values are defined (e.g. batch<list<_>>)
// The array at this level looks like:
// 0: [a]
// 1: [a]
// 2: [a]
// 3: [a]
- let parent_levels = LevelInfo {
- definition: vec![1, 1, 1, 1],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4],
- array_mask: vec![true, true, true, true],
- max_definition: 1,
- level_type: LevelType::Struct(true),
- offset: 0,
- length: 4,
+
+ let leaf = Int32Array::from_iter([0; 4]);
+ let list = ArrayData::builder(list_type.clone())
+ .len(4)
+ .add_buffer(Buffer::from_iter(0_i32..5))
+ .add_child_data(leaf.data().clone())
+ .build()
+ .unwrap();
+ let list = make_array(list);
+
+ let list_field = Field::new("list", list_type.clone(), false);
+ let levels = calculate_array_levels(&list, &list_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
+ let expected_levels = LevelInfo {
+ def_levels: Some(vec![1; 4]),
+ rep_levels: Some(vec![0; 4]),
+ non_null_indices: (0..4).collect(),
+ max_def_level: 1,
+ max_rep_level: 1,
};
- // 0: null ([], but mask is false, so it's not just an empty list)
- // 1: [1, 2, 3]
- // 2: [4, 5]
- // 3: [6, 7]
- let array_offsets = vec![0, 1, 4, 6, 8];
- let array_mask = vec![false, true, true, true];
+ assert_eq!(&levels[0], &expected_levels);
- let levels = parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(true),
- );
- // 0: [null], level 1 is defined, but not 2
+ // 0: null
// 1: [1, 2, 3]
// 2: [4, 5]
// 3: [6, 7]
+ let leaf = Int32Array::from_iter(0..8);
+ let list = ArrayData::builder(list_type.clone())
+ .len(4)
+ .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
+ .null_bit_buffer(Some(Buffer::from([0b00001110])))
+ .add_child_data(leaf.data().clone())
+ .build()
+ .unwrap();
+ let list = make_array(list);
+ let list_field = Field::new("list", list_type, true);
+
+ let struct_array = StructArray::from(vec![(list_field, list)]);
+ let array = Arc::new(struct_array) as ArrayRef;
+
+ let struct_field = Field::new("struct", array.data_type().clone(), true);
+ let levels = calculate_array_levels(&array, &struct_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
let expected_levels = LevelInfo {
- definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
- repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
- array_offsets,
- array_mask: vec![false, true, true, true, true, true, true, true],
- max_definition: 3,
- level_type: LevelType::List(true),
- offset: 0,
- length: 8,
+ def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
+ rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
+ non_null_indices: (0..7).collect(),
+ max_def_level: 3,
+ max_rep_level: 1,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
-
- // nested lists (using previous test)
- let nested_parent_levels = levels;
- // 0: [null] (was a populated null slot at the parent)
- // 1: [201]
- // 2: [202, 203]
- // 3: null ([])
- // 4: [204, 205, 206]
- // 5: [207, 208, 209, 210]
- // 6: [] (tests a non-null empty list slot)
- // 7: [211, 212, 213, 214, 215]
- let array_offsets = vec![0, 1, 2, 4, 4, 7, 11, 11, 16];
- // logically, the fist slot of the mask is false
- let array_mask = vec![true, true, true, false, true, true, true, true];
- let levels = nested_parent_levels.calculate_child_levels(
- array_offsets.clone(),
- array_mask,
- LevelType::List(true),
- );
- // We have 7 array values, and at least 15 primitives (from array_offsets)
- // 0: (-)[null], parent was null, no value populated here
- // 1: (0)[201], (1)[202, 203], (2)[[null]]
- // 2: (3)[204, 205, 206], (4)[207, 208, 209, 210]
- // 3: (5)[[]], (6)[211, 212, 213, 214, 215]
- //
+ assert_eq!(&levels[0], &expected_levels);
+
+ // nested lists
// In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
- // 0: {"struct": [ null ]}
+ // 0: {"struct": null }
// 1: {"struct": [ [201], [202, 203], [] ]}
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
+
+ let leaf = Int32Array::from_iter(201..216);
+ let leaf_field = Field::new("leaf", DataType::Int32, false);
+ let list_1_type = DataType::List(Box::new(leaf_field));
+ let list_1 = ArrayData::builder(list_1_type.clone())
+ .len(7)
+ .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
+ .add_child_data(leaf.data().clone())
+ .build()
+ .unwrap();
+
+ let list_1_field = Field::new("l1", list_1_type, true);
+ let list_2_type = DataType::List(Box::new(list_1_field));
+ let list_2 = ArrayData::builder(list_2_type.clone())
+ .len(4)
+ .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
+ .null_bit_buffer(Some(Buffer::from([0b00001110])))
+ .add_child_data(list_1)
+ .build()
+ .unwrap();
+
+ let list_2 = make_array(list_2);
+ let list_2_field = Field::new("list_2", list_2_type, true);
+
+ let struct_array =
+ StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
+ let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
+
+ let array = Arc::new(struct_array) as ArrayRef;
+ let levels = calculate_array_levels(&array, &struct_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
let expected_levels = LevelInfo {
- definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
- repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
- array_mask: vec![
- false, true, true, true, false, true, true, true, true, true, true, true,
- true, true, true, true, true, true,
- ],
- array_offsets,
- max_definition: 5,
- level_type: LevelType::List(true),
- offset: 0,
- length: 16,
+ def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
+ rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
+ non_null_indices: (0..15).collect(),
+ max_def_level: 5,
+ max_rep_level: 2,
};
- assert_eq!(&levels.definition, &expected_levels.definition);
- assert_eq!(&levels.repetition, &expected_levels.repetition);
- assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
- assert_eq!(&levels.array_mask, &expected_levels.array_mask);
- assert_eq!(&levels.max_definition, &expected_levels.max_definition);
- assert_eq!(&levels.level_type, &expected_levels.level_type);
- assert_eq!(&levels, &expected_levels);
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
@@ -1255,54 +862,34 @@ mod tests {
// - {a: {b: null}}
// - {a: null}}
// - {a: {b: {c: 6}}}
- let a_levels = LevelInfo {
- definition: vec![1, 1, 1, 1, 0, 1],
- repetition: None,
- array_offsets: (0..=6).collect(),
- array_mask: vec![true, true, true, true, false, true],
- max_definition: 1,
- level_type: LevelType::Struct(true),
- offset: 0,
- length: 6,
- };
- // b's offset and mask
- let b_offsets: Vec<i64> = (0..=6).collect();
- let b_mask = vec![true, true, true, false, false, true];
- // b's expected levels
- let b_expected_levels = LevelInfo {
- definition: vec![2, 2, 2, 1, 0, 2],
- repetition: None,
- array_offsets: (0..=6).collect(),
- array_mask: vec![true, true, true, false, false, true],
- max_definition: 2,
- level_type: LevelType::Struct(true),
- offset: 0,
- length: 6,
- };
- let b_levels = a_levels.calculate_child_levels(
- b_offsets.clone(),
- b_mask,
- LevelType::Struct(true),
- );
- assert_eq!(&b_expected_levels, &b_levels);
-
- // c's offset and mask
- let c_offsets = b_offsets;
- let c_mask = vec![true, false, true, false, false, true];
- // c's expected levels
- let c_expected_levels = LevelInfo {
- definition: vec![3, 2, 3, 1, 0, 3],
- repetition: None,
- array_offsets: c_offsets.clone(),
- array_mask: vec![true, false, true, false, false, true],
- max_definition: 3,
- level_type: LevelType::Struct(true),
- offset: 0,
- length: 6,
+
+ let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
+ let c_field = Field::new("c", DataType::Int32, true);
+ let b = StructArray::from((
+ (vec![(c_field, Arc::new(c) as ArrayRef)]),
+ Buffer::from([0b00110111]),
+ ));
+
+ let b_field = Field::new("b", b.data_type().clone(), true);
+ let a = StructArray::from((
+ (vec![(b_field, Arc::new(b) as ArrayRef)]),
+ Buffer::from([0b00101111]),
+ ));
+
+ let a_field = Field::new("a", a.data_type().clone(), true);
+ let a_array = Arc::new(a) as ArrayRef;
+
+ let levels = calculate_array_levels(&a_array, &a_field).unwrap();
+ assert_eq!(levels.len(), 1);
+
+ let expected_levels = LevelInfo {
+ def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
+ rep_levels: None,
+ non_null_indices: vec![0, 2, 5],
+ max_def_level: 3,
+ max_rep_level: 0,
};
- let c_levels =
- b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
- assert_eq!(&c_expected_levels, &c_levels);
+ assert_eq!(&levels[0], &expected_levels);
}
#[test]
@@ -1310,8 +897,7 @@ mod tests {
// this tests the level generation from the arrow_writer equivalent test
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
- let a_value_offsets =
- arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+ let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
let a_list_type =
DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
let a_list_data = ArrayData::builder(a_list_type.clone())
@@ -1325,56 +911,25 @@ mod tests {
assert_eq!(a_list_data.null_count(), 1);
let a = ListArray::from(a_list_data);
- let values = Arc::new(a);
+ let values = Arc::new(a) as _;
- let schema = Schema::new(vec![Field::new("item", a_list_type, true)]);
+ let item_field = Field::new("item", a_list_type, true);
+ let mut builder =
+ LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+ builder.write(&values, 2..4);
+ let levels = builder.finish();
- let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
-
- let expected_batch_level = LevelInfo {
- definition: vec![0; 2],
- repetition: None,
- array_offsets: (0..=2).collect(),
- array_mask: vec![true, true],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 2,
- length: 2,
- };
-
- let batch_level = LevelInfo::new(2, 2);
- assert_eq!(&batch_level, &expected_batch_level);
-
- // calculate the list's level
- let mut levels = vec![];
- batch
- .columns()
- .iter()
- .zip(batch.schema().fields())
- .for_each(|(array, field)| {
- let mut array_levels = batch_level.calculate_array_levels(array, field);
- levels.append(&mut array_levels);
- });
assert_eq!(levels.len(), 1);
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
- definition: vec![0, 3, 3, 3],
- repetition: Some(vec![0, 0, 1, 1]),
- array_offsets: vec![3, 3, 6],
- array_mask: vec![false, true, true, true],
- max_definition: 3,
- level_type: LevelType::Primitive(true),
- offset: 3,
- length: 3,
+ def_levels: Some(vec![0, 3, 3, 3]),
+ rep_levels: Some(vec![0, 0, 1, 1]),
+ non_null_indices: vec![3, 4, 5],
+ max_def_level: 3,
+ max_rep_level: 1,
};
- assert_eq!(&list_level.definition, &expected_level.definition);
- assert_eq!(&list_level.repetition, &expected_level.repetition);
- assert_eq!(&list_level.array_offsets, &expected_level.array_offsets);
- assert_eq!(&list_level.array_mask, &expected_level.array_mask);
- assert_eq!(&list_level.max_definition, &expected_level.max_definition);
- assert_eq!(&list_level.level_type, &expected_level.level_type);
assert_eq!(list_level, &expected_level);
}
@@ -1445,20 +1000,6 @@ mod tests {
.unwrap();
//////////////////////////////////////////////
- let expected_batch_level = LevelInfo {
- definition: vec![0; 5],
- repetition: None,
- array_offsets: (0..=5).collect(),
- array_mask: vec![true, true, true, true, true],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 5,
- };
-
- let batch_level = LevelInfo::new(0, 5);
- assert_eq!(&batch_level, &expected_batch_level);
-
// calculate the list's level
let mut levels = vec![];
batch
@@ -1466,7 +1007,7 @@ mod tests {
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
- let mut array_levels = batch_level.calculate_array_levels(array, field);
+ let mut array_levels = calculate_array_levels(array, field).unwrap();
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 5);
@@ -1475,14 +1016,11 @@ mod tests {
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
- definition: vec![0, 0, 0, 0, 0],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![true, true, true, true, true],
- max_definition: 0,
- level_type: LevelType::Primitive(false),
- offset: 0,
- length: 5,
+ def_levels: None,
+ rep_levels: None,
+ non_null_indices: vec![0, 1, 2, 3, 4],
+ max_def_level: 0,
+ max_rep_level: 0,
};
assert_eq!(list_level, &expected_level);
@@ -1490,14 +1028,11 @@ mod tests {
let list_level = levels.get(1).unwrap();
let expected_level = LevelInfo {
- definition: vec![1, 0, 0, 1, 1],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![true, false, false, true, true],
- max_definition: 1,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ def_levels: Some(vec![1, 0, 0, 1, 1]),
+ rep_levels: None,
+ non_null_indices: vec![0, 3, 4],
+ max_def_level: 1,
+ max_rep_level: 0,
};
assert_eq!(list_level, &expected_level);
@@ -1505,14 +1040,11 @@ mod tests {
let list_level = levels.get(2).unwrap();
let expected_level = LevelInfo {
- definition: vec![1, 1, 1, 2, 1],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![false, false, false, true, false],
- max_definition: 2,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ def_levels: Some(vec![1, 1, 1, 2, 1]),
+ rep_levels: None,
+ non_null_indices: vec![3],
+ max_def_level: 2,
+ max_rep_level: 0,
};
assert_eq!(list_level, &expected_level);
@@ -1520,36 +1052,15 @@ mod tests {
let list_level = levels.get(3).unwrap();
let expected_level = LevelInfo {
- definition: vec![3, 2, 3, 2, 3],
- repetition: None,
- array_offsets: vec![0, 1, 2, 3, 4, 5],
- array_mask: vec![true, false, true, false, true],
- max_definition: 3,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ def_levels: Some(vec![3, 2, 3, 2, 3]),
+ rep_levels: None,
+ non_null_indices: vec![0, 2, 4],
+ max_def_level: 3,
+ max_rep_level: 0,
};
assert_eq!(list_level, &expected_level);
}
- #[test]
- fn test_filter_array_indices() {
- let level = LevelInfo {
- definition: vec![3, 3, 3, 1, 3, 3, 3],
- repetition: Some(vec![0, 1, 1, 0, 0, 1, 1]),
- array_offsets: vec![0, 3, 3, 6],
- array_mask: vec![true, true, true, false, true, true, true],
- max_definition: 3,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 6,
- };
-
- let expected = vec![0, 1, 2, 3, 4, 5];
- let filter = level.filter_array_indices();
- assert_eq!(expected, filter);
- }
-
#[test]
fn test_null_vs_nonnull_struct() {
// define schema
@@ -1571,9 +1082,8 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
- let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_null_level =
- batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
+ calculate_array_levels(batch.column(0), batch.schema().field(0));
// create second batch
// define schema
@@ -1595,9 +1105,8 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
- let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_non_null_level =
- batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
+ calculate_array_levels(batch.column(0), batch.schema().field(0));
// The 2 levels should not be the same
if struct_non_null_level == struct_null_level {
@@ -1634,20 +1143,6 @@ mod tests {
let batch = reader.next().unwrap().unwrap();
- let expected_batch_level = LevelInfo {
- definition: vec![0; 3],
- repetition: None,
- array_offsets: (0..=3).collect(),
- array_mask: vec![true, true, true],
- max_definition: 0,
- level_type: LevelType::Root,
- offset: 0,
- length: 3,
- };
-
- let batch_level = LevelInfo::new(0, 3);
- assert_eq!(&batch_level, &expected_batch_level);
-
// calculate the map's level
let mut levels = vec![];
batch
@@ -1655,7 +1150,7 @@ mod tests {
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
- let mut array_levels = batch_level.calculate_array_levels(array, field);
+ let mut array_levels = calculate_array_levels(array, field).unwrap();
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 2);
@@ -1664,14 +1159,11 @@ mod tests {
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
- definition: vec![1; 7],
- repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
- array_offsets: vec![0, 2, 4, 7],
- array_mask: vec![true; 7],
- max_definition: 1,
- level_type: LevelType::Primitive(false),
- offset: 0,
- length: 7,
+ def_levels: Some(vec![1; 7]),
+ rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
+ max_def_level: 1,
+ max_rep_level: 1,
};
assert_eq!(list_level, &expected_level);
@@ -1679,14 +1171,11 @@ mod tests {
let list_level = levels.get(1).unwrap();
let expected_level = LevelInfo {
- definition: vec![2, 2, 2, 1, 2, 1, 2],
- repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
- array_offsets: vec![0, 2, 4, 7],
- array_mask: vec![true, true, true, false, true, false, true],
- max_definition: 2,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 7,
+ def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
+ rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ non_null_indices: vec![0, 1, 2, 4, 6],
+ max_def_level: 2,
+ max_rep_level: 1,
};
assert_eq!(list_level, &expected_level);
}
@@ -1760,63 +1249,155 @@ mod tests {
let array = Arc::new(list_builder.finish());
+ let values_len = array.data().child_data()[0].len();
+ assert_eq!(values_len, 5);
+
let schema = Arc::new(Schema::new(vec![list_field]));
let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
- let batch_level = LevelInfo::new(0, rb.num_rows());
- let list_level =
- &batch_level.calculate_array_levels(rb.column(0), rb.schema().field(0))[0];
+ let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
+ let list_level = &levels[0];
let expected_level = LevelInfo {
- definition: vec![4, 1, 0, 2, 2, 3, 4],
- repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
- array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
- array_mask: vec![true, true, false, false, false, false, true],
- max_definition: 4,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
+ rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
+ non_null_indices: vec![0, 4],
+ max_def_level: 4,
+ max_rep_level: 1,
};
assert_eq!(list_level, &expected_level);
}
#[test]
- fn test_nested_indices() {
- // Given a buffer like
- // [0, null, null, 1, 2]
- //
- // The two level infos below might represent the two structures
- // 1: [{a: 0}], [], null, [null, null], [{a: 1}], [{a: 2}]
- // 2: [0], [], null, [null, null], [1], [2]
- //
- // (That is, their only difference is that the leaf values are nested one level deeper in a
- // struct).
-
- let level1 = LevelInfo {
- definition: vec![4, 1, 0, 2, 2, 4, 4],
- repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
- array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
- array_mask: vec![true, true, false, false, false, false, true],
- max_definition: 4,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ fn test_struct_mask_list() {
+ // Test the null mask of a struct array masking out non-empty slices of a child ListArray
+ let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+ Some(vec![Some(1), Some(2)]),
+ Some(vec![None]),
+ Some(vec![]),
+ Some(vec![Some(3), None]), // Masked by struct array
+ Some(vec![Some(4), Some(5)]),
+ None, // Masked by struct array
+ None,
+ ]);
+
+ // This test assumes that nulls don't take up space
+ assert_eq!(inner.data().child_data()[0].len(), 7);
+
+ let field = Field::new("list", inner.data_type().clone(), true);
+ let array = Arc::new(inner) as ArrayRef;
+ let nulls = Buffer::from([0b01010111]);
+ let struct_a = StructArray::from((vec![(field, array)], nulls));
+
+ let field = Field::new("struct", struct_a.data_type().clone(), true);
+ let array = Arc::new(struct_a) as ArrayRef;
+ let levels = calculate_array_levels(&array, &field).unwrap();
+
+ assert_eq!(levels.len(), 1);
+
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
+ rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
+ non_null_indices: vec![0, 1, 5, 6],
+ max_def_level: 4,
+ max_rep_level: 1,
};
- let level2 = LevelInfo {
- definition: vec![3, 1, 0, 2, 2, 3, 3],
- repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]),
- array_offsets: vec![0, 1, 1, 1, 3, 4, 5],
- array_mask: vec![true, true, false, false, false, false, true],
- max_definition: 3,
- level_type: LevelType::Primitive(true),
- offset: 0,
- length: 5,
+ assert_eq!(&levels[0], &expected_level);
+ }
+
+ #[test]
+ fn test_list_mask_struct() {
+ // Test the null mask of a struct array and the null mask of a list array
+ // masking out non-null elements of their children
+
+ let a1 = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+ Some(vec![None]), // Masked by list array
+ Some(vec![]), // Masked by list array
+ Some(vec![Some(3), None]),
+ Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
+ None,
+ None,
+ ])) as ArrayRef;
+
+ let a2 = Arc::new(Int32Array::from_iter(vec![
+ Some(1), // Masked by list array
+ Some(2), // Masked by list array
+ None,
+ Some(4), // Masked by struct array
+ Some(5),
+ None,
+ ])) as ArrayRef;
+
+ let field_a1 = Field::new("list", a1.data_type().clone(), true);
+ let field_a2 = Field::new("integers", a2.data_type().clone(), true);
+
+ let nulls = Buffer::from([0b00110111]);
+ let struct_a = Arc::new(
+ StructArray::try_from((vec![(field_a1, a1), (field_a2, a2)], nulls)).unwrap(),
+ ) as ArrayRef;
+
+ let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
+ let nulls = Buffer::from([0b00111100]);
+
+ let list_type = DataType::List(Box::new(Field::new(
+ "struct",
+ struct_a.data_type().clone(),
+ true,
+ )));
+
+ let data = ArrayDataBuilder::new(list_type.clone())
+ .len(6)
+ .null_bit_buffer(Some(nulls))
+ .add_buffer(offsets)
+ .add_child_data(struct_a.data().clone())
+ .build()
+ .unwrap();
+
+ let list = make_array(data);
+ let list_field = Field::new("col", list_type, true);
+
+ let expected = vec![
+ r#"+-------------------------------------+"#,
+ r#"| col |"#,
+ r#"+-------------------------------------+"#,
+ r#"| |"#,
+ r#"| |"#,
+ r#"| [] |"#,
+ r#"| [{"list": [3, ], "integers": null}] |"#,
+ r#"| [, {"list": null, "integers": 5}] |"#,
+ r#"| [] |"#,
+ r#"+-------------------------------------+"#,
+ ]
+ .join("\n");
+
+ let pretty = pretty_format_columns(list_field.name(), &[list.clone()]).unwrap();
+ assert_eq!(pretty.to_string(), expected);
+
+ let levels = calculate_array_levels(&list, &list_field).unwrap();
+
+ assert_eq!(levels.len(), 2);
+
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
+ rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
+ non_null_indices: vec![1],
+ max_def_level: 6,
+ max_rep_level: 2,
+ };
+
+ assert_eq!(&levels[0], &expected_level);
+
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
+ rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
+ non_null_indices: vec![4],
+ max_def_level: 4,
+ max_rep_level: 1,
};
- // filter_array_indices should return the same indices in this case.
- assert_eq!(level1.filter_array_indices(), level2.filter_array_indices());
+ assert_eq!(&levels[1], &expected_level);
}
}