You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/24 17:17:29 UTC
[arrow-rs] branch master updated: Parquet Reader/writer for fixed-size list arrays (#4267)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 741244da7 Parquet Reader/writer for fixed-size list arrays (#4267)
741244da7 is described below
commit 741244da7846fdcb0b34d24ea90e77025863a88c
Author: Dexter Duckworth <de...@users.noreply.github.com>
AuthorDate: Wed May 24 13:17:22 2023 -0400
Parquet Reader/writer for fixed-size list arrays (#4267)
* Initial implementation for writing fixed-size lists to Parquet.
The implementation still needs tests.
The implementation uses a new `write_fixed_size_list` method instead of `write_list`.
This is done to avoid the overhead of needlessly calculating list offsets.
* Initial implementation for reading fixed-size lists from Parquet.
The implementation still needs tests.
* Added tests for fixed-size list writer.
Fixed bugs in implementation found via tests.
* Added tests for fixed-size list reader.
Fixed bugs in implementation found via tests.
* Added correct behavior for writing empty fixed-length lists.
Writer now emits the correct definition levels for empty lists.
Added empty list unit test.
* Added correct behavior for reading empty fixed-length lists.
Reader now handles empty list definition levels correctly.
Added empty list unit test.
* Fixed linter warnings.
* Added license header to fixed_size_list_array.rs
* Added fixed-size list reader tests from PR review.
* Added fixed-size reader row length sanity checks.
* Simplified fixed-size list case in LevelInfoBuilder constructor.
* Removed dynamic dispatch inside fixed-length list writer.
* Expanded list of structs test for fixed-size list writer.
* Reverted expected levels in fixed-size list writer test.
* Fixed linter warnings.
* Updated list size check in fixed-size list reader.
Converted the check to return an error instead of panicking.
* Small tweak to row length check in fixed-size list reader.
* Fixed bug in fixed-size list level encoding.
Writer now correctly handles child arrays with variable row length.
Added new unit test to verify the new behavior is correct.
* Added fixed-size list reader test.
Test verifies that reader handles child arrays with variable length correctly.
---
parquet/src/arrow/array_reader/builder.rs | 44 +-
.../arrow/array_reader/fixed_size_list_array.rs | 688 +++++++++++++++++++++
parquet/src/arrow/array_reader/mod.rs | 2 +
parquet/src/arrow/arrow_writer/levels.rs | 368 ++++++++++-
parquet/src/arrow/arrow_writer/mod.rs | 16 +-
5 files changed, 1112 insertions(+), 6 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 241a5efe0..5e0d05e89 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -23,8 +23,8 @@ use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
- ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader,
- RowGroupCollection, StructArrayReader,
+ FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
+ PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
@@ -63,6 +63,9 @@ fn build_reader(
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
+ DataType::FixedSizeList(_, _) => {
+ build_fixed_size_list_reader(field, mask, row_groups)
+ }
d => unimplemented!("reading group type {} not implemented", d),
},
}
@@ -166,6 +169,43 @@ fn build_list_reader(
Ok(reader)
}
+/// Build array reader for fixed-size list type.
+fn build_fixed_size_list_reader(
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Option<Box<dyn ArrayReader>>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 1);
+
+ let reader = match build_reader(&children[0], mask, row_groups)? {
+ Some(item_reader) => {
+ let item_type = item_reader.get_data_type().clone();
+ let reader = match &field.arrow_type {
+ &DataType::FixedSizeList(ref f, size) => {
+ let data_type = DataType::FixedSizeList(
+ Arc::new(f.as_ref().clone().with_data_type(item_type)),
+ size,
+ );
+
+ Box::new(FixedSizeListArrayReader::new(
+ item_reader,
+ size as usize,
+ data_type,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ )) as _
+ }
+ _ => unimplemented!(),
+ };
+ Some(reader)
+ }
+ None => None,
+ };
+ Ok(reader)
+}
+
/// Creates primitive array reader for each primitive type.
fn build_primitive_reader(
field: &ParquetField,
diff --git a/parquet/src/arrow/array_reader/fixed_size_list_array.rs b/parquet/src/arrow/array_reader/fixed_size_list_array.rs
new file mode 100644
index 000000000..4cf68a066
--- /dev/null
+++ b/parquet/src/arrow/array_reader/fixed_size_list_array.rs
@@ -0,0 +1,688 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+ item_reader: Box<dyn ArrayReader>,
+ /// The number of child items in each row of the list array
+ fixed_size: usize,
+ data_type: ArrowType,
+ /// The definition level at which this list is not null
+ def_level: i16,
+ /// The repetition level that corresponds to a new value in this array
+ rep_level: i16,
+ /// If the list is nullable
+ nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+ /// Construct fixed-size list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ fixed_size: usize,
+ data_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ nullable: bool,
+ ) -> Self {
+ Self {
+ item_reader,
+ fixed_size,
+ data_type,
+ def_level,
+ rep_level,
+ nullable,
+ }
+ }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+ let size = self.item_reader.read_records(batch_size)?;
+ Ok(size)
+ }
+
+ fn consume_batch(&mut self) -> Result<ArrayRef> {
+ let next_batch_array = self.item_reader.consume_batch()?;
+ if next_batch_array.len() == 0 {
+ return Ok(new_empty_array(&self.data_type));
+ }
+
+ let def_levels = self
+ .get_def_levels()
+ .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+ let rep_levels = self
+ .get_rep_levels()
+ .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+ if !rep_levels.is_empty() && rep_levels[0] != 0 {
+ // This implies either the source data was invalid, or the leaf column
+ // reader did not correctly delimit semantic records
+ return Err(general_err!("first repetition level of batch must be 0"));
+ }
+
+ let mut validity = self
+ .nullable
+ .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+ let data = next_batch_array.to_data();
+ let mut child_data_builder =
+ MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+ // The current index into the child array entries
+ let mut child_idx = 0;
+ // The total number of rows (valid and invalid) in the list array
+ let mut list_len = 0;
+ // Start of the current run of valid values
+ let mut start_idx = None;
+ let mut row_len = 0;
+
+ def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+ match r.cmp(&self.rep_level) {
+ Ordering::Greater => {
+ // Repetition level greater than current => already handled by inner array
+ if *d < self.def_level {
+ return Err(general_err!(
+ "Encountered repetition level too large for definition level"
+ ));
+ }
+ }
+ Ordering::Equal => {
+ // Item inside of the current list
+ child_idx += 1;
+ row_len += 1;
+ }
+ Ordering::Less => {
+ // Start of new list row
+ list_len += 1;
+
+ // Length of the previous row should be equal to:
+ // - the list's fixed size (valid entries)
+ // - zero (null entries, start of array)
+ // Any other length indicates invalid data
+ if start_idx.is_some() && row_len != self.fixed_size {
+ return Err(general_err!(
+ "Encountered misaligned row with length {} (expected length {})",
+ row_len,
+ self.fixed_size
+ ))
+ }
+ row_len = 0;
+
+ if *d >= self.def_level {
+ row_len += 1;
+
+ // Valid list entry
+ if let Some(validity) = validity.as_mut() {
+ validity.append(true);
+ }
+ // Start a run of valid rows if not already inside of one
+ start_idx.get_or_insert(child_idx);
+ } else {
+ // Null list entry
+
+ if let Some(start) = start_idx.take() {
+ // Flush pending child items
+ child_data_builder.extend(0, start, child_idx);
+ }
+ // Pad list with nulls
+ child_data_builder.extend_nulls(self.fixed_size);
+
+ if let Some(validity) = validity.as_mut() {
+ // Valid if empty list
+ validity.append(*d + 1 == self.def_level);
+ }
+ }
+ child_idx += 1;
+ }
+ }
+ Ok(())
+ })?;
+
+ let child_data = match start_idx {
+ Some(0) => {
+ // No null entries - can reuse original array
+ next_batch_array.to_data()
+ }
+ Some(start) => {
+ // Flush pending child items
+ child_data_builder.extend(0, start, child_idx);
+ child_data_builder.freeze()
+ }
+ None => child_data_builder.freeze(),
+ };
+
+ // Verify total number of elements is aligned with fixed list size
+ if list_len * self.fixed_size != child_data.len() {
+ return Err(general_err!(
+ "fixed-size list length must be a multiple of {} but array contains {} elements",
+ self.fixed_size,
+ child_data.len()
+ ));
+ }
+
+ let mut list_builder = ArrayData::builder(self.get_data_type().clone())
+ .len(list_len)
+ .add_child_data(child_data);
+
+ if let Some(builder) = validity {
+ list_builder = list_builder.null_bit_buffer(Some(builder.into()));
+ }
+
+ let list_data = unsafe { list_builder.build_unchecked() };
+
+ let result_array = FixedSizeListArray::from(list_data);
+ Ok(Arc::new(result_array))
+ }
+
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.item_reader.skip_records(num_records)
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.item_reader.get_def_levels()
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.item_reader.get_rep_levels()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::{
+ array_reader::{test_util::InMemoryArrayReader, ListArrayReader},
+ arrow_reader::{
+ ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
+ },
+ ArrowWriter,
+ };
+ use arrow::datatypes::{Field, Int32Type};
+ use arrow_array::{
+ builder::{FixedSizeListBuilder, Int32Builder, ListBuilder},
+ cast::AsArray,
+ FixedSizeListArray, ListArray, PrimitiveArray, RecordBatch,
+ };
+ use arrow_buffer::Buffer;
+ use arrow_data::ArrayDataBuilder;
+ use arrow_schema::Schema;
+ use bytes::Bytes;
+
+ #[test]
+ fn test_nullable_list() {
+ // [null, [1, null, 2], null, [3, 4, 5], [null, null, null]]
+ let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![
+ None,
+ Some([Some(1), None, Some(2)]),
+ None,
+ Some([Some(3), Some(4), Some(5)]),
+ Some([None, None, None]),
+ ],
+ 3,
+ );
+
+ let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+ None,
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ Some(4),
+ Some(5),
+ None,
+ None,
+ None,
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]),
+ Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]),
+ );
+
+ let mut list_array_reader = FixedSizeListArrayReader::new(
+ Box::new(item_array_reader),
+ 3,
+ ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 3,
+ ),
+ 2,
+ 1,
+ true,
+ );
+ let actual = list_array_reader.next_batch(1024).unwrap();
+ let actual = actual
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .unwrap();
+ assert_eq!(&expected, actual)
+ }
+
+ #[test]
+ fn test_required_list() {
+ // [[1, null], [2, 3], [null, null], [4, 5]]
+ let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![
+ Some([Some(1), None]),
+ Some([Some(2), Some(3)]),
+ Some([None, None]),
+ Some([Some(4), Some(5)]),
+ ],
+ 2,
+ );
+
+ let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ Some(3),
+ None,
+ None,
+ Some(4),
+ Some(5),
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![2, 1, 2, 2, 1, 1, 2, 2]),
+ Some(vec![0, 1, 0, 1, 0, 1, 0, 1]),
+ );
+
+ let mut list_array_reader = FixedSizeListArrayReader::new(
+ Box::new(item_array_reader),
+ 2,
+ ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 2,
+ ),
+ 1,
+ 1,
+ false,
+ );
+ let actual = list_array_reader.next_batch(1024).unwrap();
+ let actual = actual
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .unwrap();
+ assert_eq!(&expected, actual)
+ }
+
+ #[test]
+ fn test_nested_list() {
+ // [
+ // null,
+ // [[1, 2]],
+ // [[null, 3]],
+ // null,
+ // [[4, 5]],
+ // [[null, null]],
+ // ]
+ let l2_type = ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 2,
+ );
+ let l1_type = ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", l2_type.clone(), false)),
+ 1,
+ );
+
+ let array = PrimitiveArray::<Int32Type>::from(vec![
+ None,
+ None,
+ Some(1),
+ Some(2),
+ None,
+ Some(3),
+ None,
+ None,
+ Some(4),
+ Some(5),
+ None,
+ None,
+ ]);
+
+ let l2 = ArrayDataBuilder::new(l2_type.clone())
+ .len(6)
+ .add_child_data(array.into_data())
+ .build()
+ .unwrap();
+
+ let l1 = ArrayDataBuilder::new(l1_type.clone())
+ .len(6)
+ .add_child_data(l2)
+ .null_bit_buffer(Some(Buffer::from([0b110110])))
+ .build()
+ .unwrap();
+
+ let expected = FixedSizeListArray::from(l1);
+
+ let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+ None,
+ Some(1),
+ Some(2),
+ None,
+ Some(3),
+ None,
+ Some(4),
+ Some(5),
+ None,
+ None,
+ ]));
+
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ values,
+ Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]),
+ Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]),
+ );
+
+ let l2 = FixedSizeListArrayReader::new(
+ Box::new(item_array_reader),
+ 2,
+ l2_type,
+ 4,
+ 2,
+ false,
+ );
+ let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true);
+
+ let expected_1 = expected.slice(0, 2);
+ let expected_2 = expected.slice(2, 4);
+
+ let actual = l1.next_batch(2).unwrap();
+ assert_eq!(actual.as_ref(), &expected_1);
+
+ let actual = l1.next_batch(1024).unwrap();
+ assert_eq!(actual.as_ref(), &expected_2);
+ }
+
+ #[test]
+ fn test_empty_list() {
+ // [null, [], null, []]
+ let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![None, Some([]), None, Some([])],
+ 0,
+ );
+
+ let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+ None, None, None, None,
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![0, 1, 0, 1]),
+ Some(vec![0, 0, 0, 0]),
+ );
+
+ let mut list_array_reader = FixedSizeListArrayReader::new(
+ Box::new(item_array_reader),
+ 0,
+ ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 0,
+ ),
+ 2,
+ 1,
+ true,
+ );
+ let actual = list_array_reader.next_batch(1024).unwrap();
+ let actual = actual
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .unwrap();
+ assert_eq!(&expected, actual)
+ }
+
+ #[test]
+ fn test_nested_var_list() {
+ // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
+ let mut builder =
+ FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
+ builder.values().append_value([Some(1), None, Some(3)]);
+ builder.values().append_null();
+ builder.append(true);
+ builder.values().append_value([Some(4)]);
+ builder.values().append_value([]);
+ builder.append(true);
+ builder.values().append_value([Some(5), Some(6)]);
+ builder.values().append_value([None, None]);
+ builder.append(true);
+ builder.values().append_null();
+ builder.values().append_null();
+ builder.append(false);
+ let expected = builder.finish();
+
+ let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+ Some(1),
+ None,
+ Some(3),
+ None,
+ Some(4),
+ None,
+ Some(5),
+ Some(6),
+ None,
+ None,
+ None,
+ ]));
+
+ let inner_type =
+ ArrowType::List(Arc::new(Field::new("item", ArrowType::Int32, true)));
+ let list_type = ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", inner_type.clone(), true)),
+ 2,
+ );
+
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
+ Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
+ );
+
+ let inner_array_reader = ListArrayReader::<i32>::new(
+ Box::new(item_array_reader),
+ inner_type,
+ 4,
+ 2,
+ true,
+ );
+
+ let mut list_array_reader = FixedSizeListArrayReader::new(
+ Box::new(inner_array_reader),
+ 2,
+ list_type,
+ 2,
+ 1,
+ true,
+ );
+ let actual = list_array_reader.next_batch(1024).unwrap();
+ let actual = actual
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .unwrap();
+ assert_eq!(&expected, actual)
+ }
+
+ #[test]
+ fn test_read_list_column() {
+ // This test writes a Parquet file containing a fixed-length array column and a primitive column,
+ // then reads the columns back from the file.
+
+ // [
+ // [1, 2, 3, null],
+ // [5, 6, 7, 8],
+ // null,
+ // [9, null, 11, 12],
+ // ]
+ let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![
+ Some(vec![Some(1), Some(2), Some(3), None]),
+ Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+ None,
+ Some(vec![Some(9), None, Some(11), Some(12)]),
+ Some(vec![None, None, None, None]),
+ ],
+ 4,
+ );
+
+ // [null, 2, 3, null, 5]
+ let primitive = PrimitiveArray::<Int32Type>::from_iter(vec![
+ None,
+ Some(2),
+ Some(3),
+ None,
+ Some(5),
+ ]);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "list",
+ ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 4,
+ ),
+ true,
+ ),
+ Field::new("primitive", ArrowType::Int32, true),
+ ]));
+
+ // Create record batch with a fixed-length array column and a primitive column
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list.clone()), Arc::new(primitive.clone())],
+ )
+ .expect("unable to create record batch");
+
+ // Write record batch to Parquet
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)
+ .expect("unable to create parquet writer");
+ writer.write(&batch).expect("unable to write record batch");
+ writer.close().expect("unable to close parquet writer");
+
+ // Read record batch from Parquet
+ let reader = Bytes::from(buffer);
+ let mut batch_reader = ParquetRecordBatchReader::try_new(reader, 1024)
+ .expect("unable to create parquet reader");
+ let actual = batch_reader
+ .next()
+ .expect("missing record batch")
+ .expect("unable to read record batch");
+
+ // Verify values of both read columns match
+ assert_eq!(schema, actual.schema());
+ let actual_list = actual
+ .column(0)
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .expect("unable to cast array to FixedSizeListArray");
+ let actual_primitive = actual.column(1).as_primitive::<Int32Type>();
+ assert_eq!(actual_list, &list);
+ assert_eq!(actual_primitive, &primitive);
+ }
+
+ #[test]
+ fn test_read_as_dyn_list() {
+ // This test verifies that fixed-size list arrays can be read from Parquet
+ // as variable-length list arrays.
+
+ // [
+ // [1, 2, 3, null],
+ // [5, 6, 7, 8],
+ // null,
+ // [9, null, 11, 12],
+ // ]
+ let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![
+ Some(vec![Some(1), Some(2), Some(3), None]),
+ Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+ None,
+ Some(vec![Some(9), None, Some(11), Some(12)]),
+ Some(vec![None, None, None, None]),
+ ],
+ 4,
+ );
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "list",
+ ArrowType::FixedSizeList(
+ Arc::new(Field::new("item", ArrowType::Int32, true)),
+ 4,
+ ),
+ true,
+ )]));
+
+ // Create record batch with a single fixed-length array column
+ let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list)]).unwrap();
+
+ // Write record batch to Parquet
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
+ .expect("unable to create parquet writer");
+ writer.write(&batch).expect("unable to write record batch");
+ writer.close().expect("unable to close parquet writer");
+
+ // Read record batch from Parquet - ignoring arrow metadata
+ let reader = Bytes::from(buffer);
+ let mut batch_reader = ArrowReaderBuilder::try_new_with_options(
+ reader,
+ ArrowReaderOptions::new().with_skip_arrow_metadata(true),
+ )
+ .expect("unable to create reader builder")
+ .build()
+ .expect("unable to create parquet reader");
+ let actual = batch_reader
+ .next()
+ .expect("missing record batch")
+ .expect("unable to read record batch");
+
+ // Verify the read column is a variable length list with values that match the input
+ let col = actual.column(0).as_list::<i32>();
+ let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+ Some(vec![Some(1), Some(2), Some(3), None]),
+ Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+ None,
+ Some(vec![Some(9), None, Some(11), Some(12)]),
+ Some(vec![None, None, None, None]),
+ ]);
+ assert_eq!(col, &expected);
+ }
+}
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index f46f6073a..823084b43 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -35,6 +35,7 @@ mod byte_array;
mod byte_array_dictionary;
mod empty_array;
mod fixed_len_byte_array;
+mod fixed_size_list_array;
mod list_array;
mod map_array;
mod null_array;
@@ -48,6 +49,7 @@ pub use builder::build_array_reader;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
+pub use fixed_size_list_array::FixedSizeListArrayReader;
pub use list_array::ListArrayReader;
pub use map_array::MapArrayReader;
pub use null_array::NullArrayReader;
diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs
index fc5b94603..21b3e7dff 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -42,7 +42,7 @@
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, OffsetSizeTrait, StructArray};
+use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field};
use std::ops::Range;
@@ -144,7 +144,8 @@ impl LevelInfoBuilder {
}
DataType::List(child)
| DataType::LargeList(child)
- | DataType::Map(child, _) => {
+ | DataType::Map(child, _)
+ | DataType::FixedSizeList(child, _) => {
let def_level = match field.is_nullable() {
true => parent_ctx.def_level + 2,
false => parent_ctx.def_level + 1,
@@ -214,6 +215,19 @@ impl LevelInfoBuilder {
range,
)
}
+ &DataType::FixedSizeList(_, size) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .expect("unable to get fixed-size list array");
+
+ self.write_fixed_size_list(
+ size as usize,
+ array.nulls(),
+ array.values(),
+ range,
+ )
+ }
_ => unreachable!(),
}
}
@@ -371,6 +385,100 @@ impl LevelInfoBuilder {
}
}
+ /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+ fn write_fixed_size_list(
+ &mut self,
+ fixed_size: usize,
+ nulls: Option<&NullBuffer>,
+ values: &dyn Array,
+ range: Range<usize>,
+ ) {
+ let (child, ctx) = match self {
+ Self::List(child, ctx) => (child, ctx),
+ _ => unreachable!(),
+ };
+
+ let write_non_null =
+ |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+ let values_start = start_idx * fixed_size;
+ let values_end = end_idx * fixed_size;
+ child.write(values, values_start..values_end);
+
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+ let row_indices = (0..fixed_size)
+ .rev()
+ .cycle()
+ .take(values_end - values_start);
+
+ // Step backward over the child rep levels and mark the start of each list
+ rep_levels
+ .iter_mut()
+ .rev()
+ // Filter out reps from nested children
+ .filter(|&&mut r| r == ctx.rep_level)
+ .zip(row_indices)
+ .for_each(|(r, idx)| {
+ if idx == 0 {
+ *r = ctx.rep_level - 1;
+ }
+ });
+ })
+ };
+
+ // If list size is 0, ignore values and just write rep/def levels.
+ let write_empty =
+ |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+ let len = end_idx - start_idx;
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+ let def_levels = leaf.def_levels.as_mut().unwrap();
+ def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+ })
+ };
+
+ let write_rows =
+ |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+ if fixed_size > 0 {
+ write_non_null(child, start_idx, end_idx)
+ } else {
+ write_empty(child, start_idx, end_idx)
+ }
+ };
+
+ match nulls {
+ Some(nulls) => {
+ let mut start_idx = None;
+ for idx in range.clone() {
+ if nulls.is_valid(idx) {
+ // Start a run of valid rows if not already inside of one
+ start_idx.get_or_insert(idx);
+ } else {
+ // Write out any pending valid rows
+ if let Some(start) = start_idx.take() {
+ write_rows(child, start, idx);
+ }
+ // Add null row
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ rep_levels.push(ctx.rep_level - 1);
+ let def_levels = leaf.def_levels.as_mut().unwrap();
+ def_levels.push(ctx.def_level - 2);
+ })
+ }
+ }
+ // Write out any remaining valid rows
+ if let Some(start) = start_idx.take() {
+ write_rows(child, start, range.end);
+ }
+ }
+ // If all rows are valid then write the whole array
+ None => write_rows(child, range.start, range.end),
+ }
+ }
+
/// Write a primitive array, as defined by [`is_leaf`]
fn write_leaf(&mut self, array: &dyn Array, range: Range<usize>) {
let info = match self {
@@ -1397,4 +1505,260 @@ mod tests {
assert_eq!(&levels[1], &expected_level);
}
+
+ #[test]
+ fn test_fixed_size_list() {
+ // [[1, 2], null, null, [7, 8], null]
+ let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+ builder.values().append_slice(&[1, 2]);
+ builder.append(true);
+ builder.values().append_slice(&[3, 4]);
+ builder.append(false);
+ builder.values().append_slice(&[5, 6]);
+ builder.append(false);
+ builder.values().append_slice(&[7, 8]);
+ builder.append(true);
+ builder.values().append_slice(&[9, 10]);
+ builder.append(false);
+ let a = builder.finish();
+
+ let item_field = Field::new("item", a.data_type().clone(), true);
+ let mut builder =
+ LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+ builder.write(&a, 1..4);
+ let levels = builder.finish();
+
+ assert_eq!(levels.len(), 1);
+
+ let list_level = levels.get(0).unwrap();
+
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![0, 0, 3, 3]),
+ rep_levels: Some(vec![0, 0, 0, 1]),
+ non_null_indices: vec![6, 7],
+ max_def_level: 3,
+ max_rep_level: 1,
+ };
+ assert_eq!(list_level, &expected_level);
+ }
+
+ #[test]
+ fn test_fixed_size_list_of_struct() {
+ // define schema
+ let field_a = Field::new("a", DataType::Int32, true);
+ let field_b = Field::new("b", DataType::Int64, false);
+ let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
+ let item_field = Field::new("item", DataType::Struct(fields.clone()), true);
+ let list_field = Field::new(
+ "list",
+ DataType::FixedSizeList(Arc::new(item_field), 2),
+ true,
+ );
+
+ let builder_a = Int32Builder::with_capacity(10);
+ let builder_b = Int64Builder::with_capacity(10);
+ let struct_builder =
+ StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
+ let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
+
+ // [
+ // [{a: 1, b: 2}, null],
+ // null,
+ // [null, null],
+ // [{a: null, b: 3}, {a: 2, b: 4}]
+ // ]
+
+ // [{a: 1, b: 2}, null]
+ let values = list_builder.values();
+ // {a: 1, b: 2}
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_value(1);
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(2);
+ values.append(true);
+ // null
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(0);
+ values.append(false);
+ list_builder.append(true);
+
+ // null
+ let values = list_builder.values();
+ // null
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(0);
+ values.append(false);
+ // null
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(0);
+ values.append(false);
+ list_builder.append(false);
+
+ // [null, null]
+ let values = list_builder.values();
+ // null
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(0);
+ values.append(false);
+ // null
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(0);
+ values.append(false);
+ list_builder.append(true);
+
+ // [{a: null, b: 3}, {a: 2, b: 4}]
+ let values = list_builder.values();
+ // {a: null, b: 3}
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null();
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(3);
+ values.append(true);
+ // {a: 2, b: 4}
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_value(2);
+ values
+ .field_builder::<Int64Builder>(1)
+ .unwrap()
+ .append_value(4);
+ values.append(true);
+ list_builder.append(true);
+
+ let array = Arc::new(list_builder.finish());
+
+ assert_eq!(array.values().len(), 8);
+ assert_eq!(array.len(), 4);
+
+ let schema = Arc::new(Schema::new(vec![list_field]));
+ let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+ let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
+ let a_levels = &levels[0];
+ let b_levels = &levels[1];
+
+ // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
+ let expected_a = LevelInfo {
+ def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
+ rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+ non_null_indices: vec![0, 7],
+ max_def_level: 4,
+ max_rep_level: 1,
+ };
+ // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
+ let expected_b = LevelInfo {
+ def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
+ rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+ non_null_indices: vec![0, 6, 7],
+ max_def_level: 3,
+ max_rep_level: 1,
+ };
+
+ assert_eq!(a_levels, &expected_a);
+ assert_eq!(b_levels, &expected_b);
+ }
+
+ #[test]
+ fn test_fixed_size_list_empty() {
+ let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
+ builder.append(true);
+ builder.append(false);
+ builder.append(true);
+ let a = builder.finish();
+
+ let item_field = Field::new("item", a.data_type().clone(), true);
+ let mut builder =
+ LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+ builder.write(&a, 0..3);
+ let levels = builder.finish();
+
+ assert_eq!(levels.len(), 1);
+
+ let list_level = levels.get(0).unwrap();
+
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![1, 0, 1]),
+ rep_levels: Some(vec![0, 0, 0]),
+ non_null_indices: vec![],
+ max_def_level: 3,
+ max_rep_level: 1,
+ };
+ assert_eq!(list_level, &expected_level);
+ }
+
+ #[test]
+ fn test_fixed_size_list_of_var_lists() {
+ // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
+ let mut builder =
+ FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
+ builder.values().append_value([Some(1), None, Some(3)]);
+ builder.values().append_null();
+ builder.append(true);
+ builder.values().append_value([Some(4)]);
+ builder.values().append_value([]);
+ builder.append(true);
+ builder.values().append_value([Some(5), Some(6)]);
+ builder.values().append_value([None, None]);
+ builder.append(true);
+ builder.values().append_null();
+ builder.values().append_null();
+ builder.append(false);
+ let a = builder.finish();
+
+ let item_field = Field::new("item", a.data_type().clone(), true);
+ let mut builder =
+ LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+ builder.write(&a, 0..4);
+ let levels = builder.finish();
+
+ let list_level = levels.get(0).unwrap();
+ let expected_level = LevelInfo {
+ def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
+ rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
+ non_null_indices: vec![0, 2, 3, 4, 5],
+ max_def_level: 5,
+ max_rep_level: 2,
+ };
+
+ assert_eq!(list_level, &expected_level);
+ }
}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index af8202182..cfad15550 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type, UInt64Type};
-use arrow_array::{types, Array, ArrayRef, RecordBatch, RecordBatchWriter};
+use arrow_array::{
+ types, Array, ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchWriter,
+};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef};
use super::schema::{
@@ -380,7 +382,17 @@ fn write_leaves<W: Write>(
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
)),
- ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
+ ArrowDataType::FixedSizeList(_, _) => {
+ let arrays: Vec<_> = arrays.iter().map(|array|{
+ array.as_any().downcast_ref::<FixedSizeListArray>()
+ .expect("unable to get fixed-size list array")
+ .values()
+ .clone()
+ }).collect();
+ write_leaves(row_group_writer, &arrays, levels)?;
+ Ok(())
+ },
+ ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"