You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/15 14:09:30 UTC
[arrow-rs] branch master updated: Split out ListArrayReader into separate module (#1483) (#1563)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5dca6f078 Split out ListArrayReader into separate module (#1483) (#1563)
5dca6f078 is described below
commit 5dca6f07884745b5839cb91739faefd52f12b02b
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Apr 15 15:09:25 2022 +0100
Split out ListArrayReader into separate module (#1483) (#1563)
* Split out ListArrayReader into separate module (#1483)
* Fix merge conflict
---
parquet/src/arrow/array_reader.rs | 430 +--------------------------
parquet/src/arrow/array_reader/list_array.rs | 379 +++++++++++++++++++++++
parquet/src/arrow/array_reader/test_util.rs | 85 +++++-
3 files changed, 473 insertions(+), 421 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f60e47c89..f4139b2f6 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -24,19 +24,17 @@ use std::sync::Arc;
use std::vec::Vec;
use arrow::array::{
- new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray,
- BooleanBufferBuilder, DecimalArray, GenericListArray, Int16BufferBuilder, Int32Array,
- Int64Array, OffsetSizeTrait, PrimitiveArray, StructArray, UInt32Array,
+ Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
+ DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
+ StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
-use arrow::compute::take;
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
- Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, ToByteSlice,
+ Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type,
UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
};
-use arrow::util::bit_util;
use crate::arrow::converter::Converter;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
@@ -56,6 +54,7 @@ mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod empty_array;
+mod list_array;
mod map_array;
mod offset_buffer;
@@ -65,6 +64,7 @@ mod test_util;
pub use builder::build_array_reader;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
+pub use list_array::ListArrayReader;
pub use map_array::MapArrayReader;
/// Array reader reads parquet data into arrow array.
@@ -649,157 +649,6 @@ where
}
}
-/// Implementation of list array reader.
-pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
- item_reader: Box<dyn ArrayReader>,
- data_type: ArrowType,
- item_type: ArrowType,
- list_def_level: i16,
- list_rep_level: i16,
- list_empty_def_level: i16,
- list_null_def_level: i16,
- def_level_buffer: Option<Buffer>,
- rep_level_buffer: Option<Buffer>,
- _marker: PhantomData<OffsetSize>,
-}
-
-impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
- /// Construct list array reader.
- pub fn new(
- item_reader: Box<dyn ArrayReader>,
- data_type: ArrowType,
- item_type: ArrowType,
- def_level: i16,
- rep_level: i16,
- list_null_def_level: i16,
- list_empty_def_level: i16,
- ) -> Self {
- Self {
- item_reader,
- data_type,
- item_type,
- list_def_level: def_level,
- list_rep_level: rep_level,
- list_null_def_level,
- list_empty_def_level,
- def_level_buffer: None,
- rep_level_buffer: None,
- _marker: PhantomData,
- }
- }
-}
-
-/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported.
-impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- /// Returns data type.
- /// This must be a List.
- fn get_data_type(&self) -> &ArrowType {
- &self.data_type
- }
-
- fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
- let next_batch_array = self.item_reader.next_batch(batch_size)?;
-
- if next_batch_array.len() == 0 {
- return Ok(new_empty_array(&self.data_type));
- }
- let def_levels = self
- .item_reader
- .get_def_levels()
- .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?;
- let rep_levels = self
- .item_reader
- .get_rep_levels()
- .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?;
-
- if !((def_levels.len() == rep_levels.len())
- && (rep_levels.len() == next_batch_array.len()))
- {
- return Err(ArrowError(
- format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()),
- ));
- }
-
- // List definitions can be encoded as 4 values:
- // - n + 0: the list slot is null
- // - n + 1: the list slot is not null, but is empty (i.e. [])
- // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ])
- // - n + 3: the list slot is not null, and its child is not empty
- // Where n is the max definition level of the list's parent.
- // If a Parquet schema's only leaf is the list, then n = 0.
-
- // If the list index is at empty definition, the child slot is null
- let non_null_list_indices =
- def_levels.iter().enumerate().filter_map(|(index, def)| {
- (*def > self.list_empty_def_level).then(|| index as u32)
- });
- let indices = UInt32Array::from_iter_values(non_null_list_indices);
- let batch_values = take(&*next_batch_array.clone(), &indices, None)?;
-
- // first item in each list has rep_level = 0, subsequent items have rep_level = 1
- let mut offsets: Vec<OffsetSize> = Vec::new();
- let mut cur_offset = OffsetSize::zero();
- def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
- if *r == 0 || d == &self.list_empty_def_level {
- offsets.push(cur_offset);
- }
- if d > &self.list_empty_def_level {
- cur_offset += OffsetSize::one();
- }
- });
-
- offsets.push(cur_offset);
-
- let num_bytes = bit_util::ceil(offsets.len(), 8);
- // TODO: A useful optimization is to use the null count to fill with
- // 0 or null, to reduce individual bits set in a loop.
- // To favour dense data, set every slot to true, then unset
- let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
- let null_slice = null_buf.as_slice_mut();
- let mut list_index = 0;
- for i in 0..rep_levels.len() {
- // If the level is lower than empty, then the slot is null.
- // When a list is non-nullable, its empty level = null level,
- // so this automatically factors that in.
- if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
- bit_util::unset_bit(null_slice, list_index);
- }
- if rep_levels[i] == 0 {
- list_index += 1;
- }
- }
- let value_offsets = Buffer::from(&offsets.to_byte_slice());
-
- let list_data = ArrayData::builder(self.get_data_type().clone())
- .len(offsets.len() - 1)
- .add_buffer(value_offsets)
- .add_child_data(batch_values.data().clone())
- .null_bit_buffer(null_buf.into())
- .offset(next_batch_array.offset());
-
- let list_data = unsafe { list_data.build_unchecked() };
-
- let result_array = GenericListArray::<OffsetSize>::from(list_data);
- Ok(Arc::new(result_array))
- }
-
- fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
- }
-
- fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
- }
-}
-
/// Implementation of struct array reader.
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
@@ -978,19 +827,14 @@ impl ArrayReader for StructArrayReader {
#[cfg(test)]
mod tests {
- use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};
- use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
- use arrow::array::{
- Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
- StructArray,
- };
- use arrow::datatypes::DataType::Struct;
+ use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+ use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1002,11 +846,8 @@ mod tests {
use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::basic::{Encoding, Type as PhysicalType};
- use crate::column::page::{Page, PageReader};
+ use crate::column::page::Page;
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
- use crate::errors::Result;
- use crate::file::properties::WriterProperties;
- use crate::file::serialized_reader::SerializedFileReader;
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use crate::util::test_common::make_pages;
@@ -1072,7 +913,7 @@ mod tests {
.unwrap();
let column_desc = schema.column(0);
- let page_iterator = EmptyPageIterator::new(schema);
+ let page_iterator = test_util::EmptyPageIterator::new(schema);
let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
@@ -1690,81 +1531,6 @@ mod tests {
);
}
- /// Array reader for test.
- struct InMemoryArrayReader {
- data_type: ArrowType,
- array: ArrayRef,
- def_levels: Option<Vec<i16>>,
- rep_levels: Option<Vec<i16>>,
- }
-
- impl InMemoryArrayReader {
- pub fn new(
- data_type: ArrowType,
- array: ArrayRef,
- def_levels: Option<Vec<i16>>,
- rep_levels: Option<Vec<i16>>,
- ) -> Self {
- Self {
- data_type,
- array,
- def_levels,
- rep_levels,
- }
- }
- }
-
- impl ArrayReader for InMemoryArrayReader {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_data_type(&self) -> &ArrowType {
- &self.data_type
- }
-
- fn next_batch(&mut self, _batch_size: usize) -> Result<ArrayRef> {
- Ok(self.array.clone())
- }
-
- fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels.as_deref()
- }
-
- fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels.as_deref()
- }
- }
-
- /// Iterator for testing reading empty columns
- struct EmptyPageIterator {
- schema: SchemaDescPtr,
- }
-
- impl EmptyPageIterator {
- fn new(schema: SchemaDescPtr) -> Self {
- EmptyPageIterator { schema }
- }
- }
-
- impl Iterator for EmptyPageIterator {
- type Item = Result<Box<dyn PageReader>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- None
- }
- }
-
- impl PageIterator for EmptyPageIterator {
- fn schema(&mut self) -> Result<SchemaDescPtr> {
- Ok(self.schema.clone())
- }
-
- fn column_schema(&mut self) -> Result<ColumnDescPtr> {
- Ok(self.schema.column(0))
- }
- }
-
#[test]
fn test_struct_array_reader() {
let array_1 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![1, 2, 3, 4, 5]));
@@ -1814,180 +1580,4 @@ mod tests {
struct_array_reader.get_rep_levels()
);
}
-
- #[test]
- fn test_list_array_reader() {
- // [[1, null, 2], null, [3, 4]]
- let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
- Some(1),
- None,
- Some(2),
- None,
- Some(3),
- Some(4),
- ]));
- let item_array_reader = InMemoryArrayReader::new(
- ArrowType::Int32,
- array,
- Some(vec![3, 2, 3, 0, 3, 3]),
- Some(vec![0, 1, 1, 0, 0, 1]),
- );
-
- let mut list_array_reader = ListArrayReader::<i32>::new(
- Box::new(item_array_reader),
- ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))),
- ArrowType::Int32,
- 1,
- 1,
- 0,
- 1,
- );
-
- let next_batch = list_array_reader.next_batch(1024).unwrap();
- let list_array = next_batch.as_any().downcast_ref::<ListArray>().unwrap();
-
- assert_eq!(3, list_array.len());
- // This passes as I expect
- assert_eq!(1, list_array.null_count());
-
- assert_eq!(
- list_array
- .value(0)
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap(),
- &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
- );
-
- assert!(list_array.is_null(1));
-
- assert_eq!(
- list_array
- .value(2)
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap(),
- &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
- );
- }
-
- #[test]
- fn test_large_list_array_reader() {
- // [[1, null, 2], null, [3, 4]]
- let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
- Some(1),
- None,
- Some(2),
- None,
- Some(3),
- Some(4),
- ]));
- let item_array_reader = InMemoryArrayReader::new(
- ArrowType::Int32,
- array,
- Some(vec![3, 2, 3, 0, 3, 3]),
- Some(vec![0, 1, 1, 0, 0, 1]),
- );
-
- let mut list_array_reader = ListArrayReader::<i64>::new(
- Box::new(item_array_reader),
- ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))),
- ArrowType::Int32,
- 1,
- 1,
- 0,
- 1,
- );
-
- let next_batch = list_array_reader.next_batch(1024).unwrap();
- let list_array = next_batch
- .as_any()
- .downcast_ref::<LargeListArray>()
- .unwrap();
-
- assert_eq!(3, list_array.len());
-
- assert_eq!(
- list_array
- .value(0)
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap(),
- &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
- );
-
- assert!(list_array.is_null(1));
-
- assert_eq!(
- list_array
- .value(2)
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap(),
- &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
- );
- }
-
- #[test]
- fn test_nested_lists() {
- // Construct column schema
- let message_type = "
- message table {
- REPEATED group table_info {
- REQUIRED BYTE_ARRAY name;
- REPEATED group cols {
- REQUIRED BYTE_ARRAY name;
- REQUIRED INT32 type;
- OPTIONAL INT32 length;
- }
- REPEATED group tags {
- REQUIRED BYTE_ARRAY name;
- REQUIRED INT32 type;
- OPTIONAL INT32 length;
- }
- }
- }
- ";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
-
- let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
-
- let file = tempfile::tempfile().unwrap();
- let props = WriterProperties::builder()
- .set_max_row_group_size(200)
- .build();
-
- let mut writer = ArrowWriter::try_new(
- file.try_clone().unwrap(),
- Arc::new(arrow_schema),
- Some(props),
- )
- .unwrap();
- writer.close().unwrap();
-
- let file_reader: Arc<dyn FileReader> =
- Arc::new(SerializedFileReader::new(file).unwrap());
-
- let file_metadata = file_reader.metadata().file_metadata();
- let arrow_schema = parquet_to_arrow_schema(
- file_metadata.schema_descr(),
- file_metadata.key_value_metadata(),
- )
- .unwrap();
-
- let mut array_reader = build_array_reader(
- file_reader.metadata().file_metadata().schema_descr_ptr(),
- Arc::new(arrow_schema.clone()),
- vec![0usize].into_iter(),
- Box::new(file_reader),
- )
- .unwrap();
-
- let batch = array_reader.next_batch(100).unwrap();
- assert_eq!(batch.data_type(), &Struct(arrow_schema.fields().clone()));
- assert_eq!(batch.len(), 0);
- }
}
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
new file mode 100644
index 000000000..31c52c9c5
--- /dev/null
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError::ArrowError;
+use crate::errors::Result;
+use arrow::array::{
+ new_empty_array, ArrayData, ArrayRef, GenericListArray,
+ OffsetSizeTrait, UInt32Array,
+};
+use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::datatypes::DataType as ArrowType;
+use arrow::datatypes::ToByteSlice;
+use arrow::util::bit_util;
+use std::any::Any;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+/// Implementation of list array reader.
+pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ list_def_level: i16,
+ list_rep_level: i16,
+ list_empty_def_level: i16,
+ list_null_def_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+ _marker: PhantomData<OffsetSize>,
+}
+
+impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
+ /// Construct list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ list_null_def_level: i16,
+ list_empty_def_level: i16,
+ ) -> Self {
+ Self {
+ item_reader,
+ data_type,
+ item_type,
+ list_def_level: def_level,
+ list_rep_level: rep_level,
+ list_null_def_level,
+ list_empty_def_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ _marker: PhantomData,
+ }
+ }
+}
+
+/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported.
+impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns data type.
+ /// This must be a List.
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let next_batch_array = self.item_reader.next_batch(batch_size)?;
+
+ if next_batch_array.len() == 0 {
+ return Ok(new_empty_array(&self.data_type));
+ }
+ let def_levels = self
+ .item_reader
+ .get_def_levels()
+ .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?;
+ let rep_levels = self
+ .item_reader
+ .get_rep_levels()
+ .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?;
+
+ if !((def_levels.len() == rep_levels.len())
+ && (rep_levels.len() == next_batch_array.len()))
+ {
+ return Err(ArrowError(
+ format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()),
+ ));
+ }
+
+ // List definitions can be encoded as 4 values:
+ // - n + 0: the list slot is null
+ // - n + 1: the list slot is not null, but is empty (i.e. [])
+ // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ])
+ // - n + 3: the list slot is not null, and its child is not empty
+ // Where n is the max definition level of the list's parent.
+ // If a Parquet schema's only leaf is the list, then n = 0.
+
+ // If the list index is at empty definition, the child slot is null
+ let non_null_list_indices =
+ def_levels.iter().enumerate().filter_map(|(index, def)| {
+ (*def > self.list_empty_def_level).then(|| index as u32)
+ });
+ let indices = UInt32Array::from_iter_values(non_null_list_indices);
+ let batch_values =
+ arrow::compute::take(&*next_batch_array.clone(), &indices, None)?;
+
+ // first item in each list has rep_level = 0, subsequent items have rep_level = 1
+ let mut offsets: Vec<OffsetSize> = Vec::new();
+ let mut cur_offset = OffsetSize::zero();
+ def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
+ if *r == 0 || d == &self.list_empty_def_level {
+ offsets.push(cur_offset);
+ }
+ if d > &self.list_empty_def_level {
+ cur_offset += OffsetSize::one();
+ }
+ });
+
+ offsets.push(cur_offset);
+
+ let num_bytes = bit_util::ceil(offsets.len(), 8);
+ // TODO: A useful optimization is to use the null count to fill with
+ // 0 or null, to reduce individual bits set in a loop.
+ // To favour dense data, set every slot to true, then unset
+ let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
+ let null_slice = null_buf.as_slice_mut();
+ let mut list_index = 0;
+ for i in 0..rep_levels.len() {
+ // If the level is lower than empty, then the slot is null.
+ // When a list is non-nullable, its empty level = null level,
+ // so this automatically factors that in.
+ if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
+ bit_util::unset_bit(null_slice, list_index);
+ }
+ if rep_levels[i] == 0 {
+ list_index += 1;
+ }
+ }
+ let value_offsets = Buffer::from(&offsets.to_byte_slice());
+
+ let list_data = ArrayData::builder(self.get_data_type().clone())
+ .len(offsets.len() - 1)
+ .add_buffer(value_offsets)
+ .add_child_data(batch_values.data().clone())
+ .null_bit_buffer(null_buf.into())
+ .offset(next_batch_array.offset());
+
+ let list_data = unsafe { list_data.build_unchecked() };
+
+ let result_array = GenericListArray::<OffsetSize>::from(list_data);
+ Ok(Arc::new(result_array))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::array_reader::build_array_reader;
+ use crate::arrow::array_reader::list_array::ListArrayReader;
+ use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+ use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
+ use crate::file::properties::WriterProperties;
+ use crate::file::reader::{FileReader, SerializedFileReader};
+ use crate::schema::parser::parse_message_type;
+ use crate::schema::types::SchemaDescriptor;
+ use arrow::array::{Array, LargeListArray, ListArray, PrimitiveArray};
+ use arrow::datatypes::{Field, Int32Type as ArrowInt32};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_list_array_reader() {
+ // [[1, null, 2], null, [3, 4]]
+ let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ Some(4),
+ ]));
+
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![3, 2, 3, 0, 3, 3]),
+ Some(vec![0, 1, 1, 0, 0, 1]),
+ );
+
+ let mut list_array_reader = ListArrayReader::<i32>::new(
+ Box::new(item_array_reader),
+ ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))),
+ ArrowType::Int32,
+ 1,
+ 1,
+ 0,
+ 1,
+ );
+
+ let next_batch = list_array_reader.next_batch(1024).unwrap();
+ let list_array = next_batch.as_any().downcast_ref::<ListArray>().unwrap();
+
+ assert_eq!(3, list_array.len());
+ // This passes as I expect
+ assert_eq!(1, list_array.null_count());
+
+ assert_eq!(
+ list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+ );
+
+ assert!(list_array.is_null(1));
+
+ assert_eq!(
+ list_array
+ .value(2)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+ );
+ }
+
+ #[test]
+ fn test_large_list_array_reader() {
+ // [[1, null, 2], null, [3, 4]]
+ let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ Some(4),
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![3, 2, 3, 0, 3, 3]),
+ Some(vec![0, 1, 1, 0, 0, 1]),
+ );
+
+ let mut list_array_reader = ListArrayReader::<i64>::new(
+ Box::new(item_array_reader),
+ ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))),
+ ArrowType::Int32,
+ 1,
+ 1,
+ 0,
+ 1,
+ );
+
+ let next_batch = list_array_reader.next_batch(1024).unwrap();
+ let list_array = next_batch
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .unwrap();
+
+ assert_eq!(3, list_array.len());
+
+ assert_eq!(
+ list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+ );
+
+ assert!(list_array.is_null(1));
+
+ assert_eq!(
+ list_array
+ .value(2)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+ );
+ }
+
+ #[test]
+ fn test_nested_lists() {
+ // Construct column schema
+ let message_type = "
+ message table {
+ REPEATED group table_info {
+ REQUIRED BYTE_ARRAY name;
+ REPEATED group cols {
+ REQUIRED BYTE_ARRAY name;
+ REQUIRED INT32 type;
+ OPTIONAL INT32 length;
+ }
+ REPEATED group tags {
+ REQUIRED BYTE_ARRAY name;
+ REQUIRED INT32 type;
+ OPTIONAL INT32 length;
+ }
+ }
+ }
+ ";
+
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+
+ let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
+
+ let file = tempfile::tempfile().unwrap();
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .build();
+
+ let mut writer = ArrowWriter::try_new(
+ file.try_clone().unwrap(),
+ Arc::new(arrow_schema),
+ Some(props),
+ )
+ .unwrap();
+ writer.close().unwrap();
+
+ let file_reader: Arc<dyn FileReader> =
+ Arc::new(SerializedFileReader::new(file).unwrap());
+
+ let file_metadata = file_reader.metadata().file_metadata();
+ let arrow_schema = parquet_to_arrow_schema(
+ file_metadata.schema_descr(),
+ file_metadata.key_value_metadata(),
+ )
+ .unwrap();
+
+ let mut array_reader = build_array_reader(
+ file_reader.metadata().file_metadata().schema_descr_ptr(),
+ Arc::new(arrow_schema.clone()),
+ vec![0usize].into_iter(),
+ Box::new(file_reader),
+ )
+ .unwrap();
+
+ let batch = array_reader.next_batch(100).unwrap();
+ assert_eq!(
+ batch.data_type(),
+ &ArrowType::Struct(arrow_schema.fields().clone())
+ );
+ assert_eq!(batch.len(), 0);
+ }
+}
diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs
index b04a597c9..f212d05b0 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -15,12 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::ArrayRef;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
use std::sync::Arc;
+use crate::arrow::array_reader::ArrayReader;
use crate::basic::{ConvertedType, Encoding, Type as PhysicalType};
+use crate::column::page::{PageIterator, PageReader};
use crate::data_type::{ByteArray, ByteArrayType};
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
-use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
+use crate::errors::Result;
+use crate::schema::types::{
+ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
+};
use crate::util::memory::{ByteBufferPtr, MemTracker};
/// Returns a descriptor for a UTF-8 column
@@ -87,3 +95,78 @@ pub fn byte_array_all_encodings(
(pages, encoded_dictionary)
}
+
+/// Array reader for test.
+pub struct InMemoryArrayReader {
+ data_type: ArrowType,
+ array: ArrayRef,
+ def_levels: Option<Vec<i16>>,
+ rep_levels: Option<Vec<i16>>,
+}
+
+impl InMemoryArrayReader {
+ pub fn new(
+ data_type: ArrowType,
+ array: ArrayRef,
+ def_levels: Option<Vec<i16>>,
+ rep_levels: Option<Vec<i16>>,
+ ) -> Self {
+ Self {
+ data_type,
+ array,
+ def_levels,
+ rep_levels,
+ }
+ }
+}
+
+impl ArrayReader for InMemoryArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, _batch_size: usize) -> Result<ArrayRef> {
+ Ok(self.array.clone())
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_levels.as_deref()
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_levels.as_deref()
+ }
+}
+
+/// Iterator for testing reading empty columns
+pub struct EmptyPageIterator {
+ schema: SchemaDescPtr,
+}
+
+impl EmptyPageIterator {
+ pub fn new(schema: SchemaDescPtr) -> Self {
+ EmptyPageIterator { schema }
+ }
+}
+
+impl Iterator for EmptyPageIterator {
+ type Item = Result<Box<dyn PageReader>>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ None
+ }
+}
+
+impl PageIterator for EmptyPageIterator {
+ fn schema(&mut self) -> Result<SchemaDescPtr> {
+ Ok(self.schema.clone())
+ }
+
+ fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+ Ok(self.schema.column(0))
+ }
+}