You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/25 09:48:05 UTC
[arrow-rs] branch master updated: Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8e4a4553e Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)
8e4a4553e is described below
commit 8e4a4553e4330aa1a9d8dd7dbb7293165cc0cc40
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 25 10:47:59 2022 +0100
Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)
* Remove ComplexObjectArrayReader add FixedLengthByteArrayReader
* Add Interval support
* Add more test
* Handle all null array
* Review feedback
---
parquet/src/arrow/array_reader/builder.rs | 80 +--
.../src/arrow/array_reader/complex_object_array.rs | 605 ---------------------
.../src/arrow/array_reader/fixed_len_byte_array.rs | 475 ++++++++++++++++
parquet/src/arrow/array_reader/mod.rs | 8 +-
parquet/src/arrow/arrow_reader/mod.rs | 28 +-
parquet/src/arrow/buffer/converter.rs | 238 --------
parquet/src/arrow/buffer/mod.rs | 1 -
parquet/src/arrow/record_reader/mod.rs | 12 +-
parquet/src/data_type.rs | 12 +
9 files changed, 533 insertions(+), 926 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index d944ff2dc..5f3ce7582 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -17,26 +17,20 @@
use std::sync::Arc;
-use arrow::datatypes::{DataType, IntervalUnit, SchemaRef};
+use arrow::datatypes::{DataType, SchemaRef};
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
+use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
- ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
- PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
-};
-use crate::arrow::buffer::converter::{
- DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
- FixedLenBinaryConverter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
- IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
- IntervalYearMonthConverter,
+ ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader,
+ RowGroupCollection, StructArrayReader,
};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
- BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
- Int64Type, Int96Type,
+ BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
@@ -127,14 +121,12 @@ fn build_primitive_reader(
field: &ParquetField,
row_groups: &dyn RowGroupCollection,
) -> Result<Box<dyn ArrayReader>> {
- let (col_idx, primitive_type, type_len) = match &field.field_type {
+ let (col_idx, primitive_type) = match &field.field_type {
ParquetFieldType::Primitive {
col_idx,
primitive_type,
} => match primitive_type.as_ref() {
- Type::PrimitiveType { type_length, .. } => {
- (*col_idx, primitive_type.clone(), *type_length)
- }
+ Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
Type::GroupType { .. } => unreachable!(),
},
_ => unreachable!(),
@@ -204,61 +196,9 @@ fn build_primitive_reader(
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type),
},
- PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
- DataType::Decimal128(precision, scale) => {
- let converter = DecimalFixedLengthByteArrayConverter::new(
- DecimalArrayConverter::new(precision, scale),
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- DecimalFixedLengthByteArrayConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- DataType::Interval(IntervalUnit::DayTime) => {
- let converter =
- IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- _,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- DataType::Interval(IntervalUnit::YearMonth) => {
- let converter =
- IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- _,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- _ => {
- let converter =
- FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- FixedLenBinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- },
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+ make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)
+ }
}
}
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs
deleted file mode 100644
index 4f958fea4..000000000
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ /dev/null
@@ -1,605 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::arrow::array_reader::ArrayReader;
-use crate::arrow::buffer::converter::Converter;
-use crate::arrow::schema::parquet_to_arrow_field;
-use crate::column::page::PageIterator;
-use crate::column::reader::ColumnReaderImpl;
-use crate::data_type::DataType;
-use crate::errors::Result;
-use crate::schema::types::ColumnDescPtr;
-use arrow::array::ArrayRef;
-use arrow::datatypes::DataType as ArrowType;
-use std::any::Any;
-use std::marker::PhantomData;
-
-/// Primitive array readers are leaves of array reader tree. They accept page iterator
-/// and read them into primitive arrays.
-pub struct ComplexObjectArrayReader<T, C>
-where
- T: DataType,
- C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
- data_type: ArrowType,
- pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Vec<i16>>,
- rep_levels_buffer: Option<Vec<i16>>,
- data_buffer: Vec<T::T>,
- column_desc: ColumnDescPtr,
- column_reader: Option<ColumnReaderImpl<T>>,
- converter: C,
- in_progress_def_levels_buffer: Option<Vec<i16>>,
- in_progress_rep_levels_buffer: Option<Vec<i16>>,
- before_consume: bool,
- _parquet_type_marker: PhantomData<T>,
- _converter_marker: PhantomData<C>,
-}
-
-impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
-where
- T: DataType,
- C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
-{
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_data_type(&self) -> &ArrowType {
- &self.data_type
- }
-
- fn read_records(&mut self, batch_size: usize) -> Result<usize> {
- if !self.before_consume {
- self.before_consume = true;
- }
- // Try to initialize column reader
- if self.column_reader.is_none() {
- self.next_column_reader()?;
- }
-
- let mut data_buffer: Vec<T::T> = Vec::with_capacity(batch_size);
- data_buffer.resize_with(batch_size, T::T::default);
-
- let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 {
- let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
- buf.resize_with(batch_size, || 0);
- Some(buf)
- } else {
- None
- };
-
- let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 {
- let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
- buf.resize_with(batch_size, || 0);
- Some(buf)
- } else {
- None
- };
-
- let mut num_read = 0;
-
- while self.column_reader.is_some() && num_read < batch_size {
- let num_to_read = batch_size - num_read;
- let cur_data_buf = &mut data_buffer[num_read..];
- let cur_def_levels_buf =
- def_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
- let cur_rep_levels_buf =
- rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
- let (data_read, levels_read) =
- self.column_reader.as_mut().unwrap().read_batch(
- num_to_read,
- cur_def_levels_buf,
- cur_rep_levels_buf,
- cur_data_buf,
- )?;
-
- // Fill space
- if levels_read > data_read {
- def_levels_buffer.iter().for_each(|def_levels_buffer| {
- let (mut level_pos, mut data_pos) = (levels_read, data_read);
- while level_pos > 0 && data_pos > 0 {
- if def_levels_buffer[num_read + level_pos - 1]
- == self.column_desc.max_def_level()
- {
- cur_data_buf.swap(level_pos - 1, data_pos - 1);
- level_pos -= 1;
- data_pos -= 1;
- } else {
- level_pos -= 1;
- }
- }
- });
- }
-
- let values_read = levels_read.max(data_read);
- num_read += values_read;
- // current page exhausted && page iterator exhausted
- if values_read < num_to_read && !self.next_column_reader()? {
- break;
- }
- }
- data_buffer.truncate(num_read);
- def_levels_buffer
- .iter_mut()
- .for_each(|buf| buf.truncate(num_read));
- rep_levels_buffer
- .iter_mut()
- .for_each(|buf| buf.truncate(num_read));
-
- if let Some(mut def_levels_buffer) = def_levels_buffer {
- match &mut self.in_progress_def_levels_buffer {
- None => {
- self.in_progress_def_levels_buffer = Some(def_levels_buffer);
- }
- Some(buf) => buf.append(&mut def_levels_buffer),
- }
- }
-
- if let Some(mut rep_levels_buffer) = rep_levels_buffer {
- match &mut self.in_progress_rep_levels_buffer {
- None => {
- self.in_progress_rep_levels_buffer = Some(rep_levels_buffer);
- }
- Some(buf) => buf.append(&mut rep_levels_buffer),
- }
- }
-
- self.data_buffer.append(&mut data_buffer);
-
- Ok(num_read)
- }
-
- fn consume_batch(&mut self) -> Result<ArrayRef> {
- let data: Vec<Option<T::T>> = if self.in_progress_def_levels_buffer.is_some() {
- let data_buffer = std::mem::take(&mut self.data_buffer);
- data_buffer
- .into_iter()
- .zip(self.in_progress_def_levels_buffer.as_ref().unwrap().iter())
- .map(|(t, def_level)| {
- if *def_level == self.column_desc.max_def_level() {
- Some(t)
- } else {
- None
- }
- })
- .collect()
- } else {
- self.data_buffer.iter().map(|x| Some(x.clone())).collect()
- };
-
- let mut array = self.converter.convert(data)?;
-
- if let ArrowType::Dictionary(_, _) = self.data_type {
- array = arrow::compute::cast(&array, &self.data_type)?;
- }
-
- self.data_buffer = vec![];
- self.def_levels_buffer = std::mem::take(&mut self.in_progress_def_levels_buffer);
- self.rep_levels_buffer = std::mem::take(&mut self.in_progress_rep_levels_buffer);
- self.before_consume = false;
-
- Ok(array)
- }
-
- fn skip_records(&mut self, num_records: usize) -> Result<usize> {
- let mut num_read = 0;
- while (self.column_reader.is_some() || self.next_column_reader()?)
- && num_read < num_records
- {
- let remain_to_skip = num_records - num_read;
- let skip = self
- .column_reader
- .as_mut()
- .unwrap()
- .skip_records(remain_to_skip)?;
- num_read += skip;
- // skip < remain_to_skip means end of row group
- // self.next_column_reader() == false means end of file
- if skip < remain_to_skip && !self.next_column_reader()? {
- break;
- }
- }
- Ok(num_read)
- }
-
- fn get_def_levels(&self) -> Option<&[i16]> {
- if self.before_consume {
- self.in_progress_def_levels_buffer.as_deref()
- } else {
- self.def_levels_buffer.as_deref()
- }
- }
-
- fn get_rep_levels(&self) -> Option<&[i16]> {
- if self.before_consume {
- self.in_progress_rep_levels_buffer.as_deref()
- } else {
- self.rep_levels_buffer.as_deref()
- }
- }
-}
-
-impl<T, C> ComplexObjectArrayReader<T, C>
-where
- T: DataType,
- C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
- pub fn new(
- pages: Box<dyn PageIterator>,
- column_desc: ColumnDescPtr,
- converter: C,
- arrow_type: Option<ArrowType>,
- ) -> Result<Self> {
- let data_type = match arrow_type {
- Some(t) => t,
- None => parquet_to_arrow_field(column_desc.as_ref())?
- .data_type()
- .clone(),
- };
-
- Ok(Self {
- data_type,
- pages,
- def_levels_buffer: None,
- rep_levels_buffer: None,
- data_buffer: vec![],
- column_desc,
- column_reader: None,
- converter,
- in_progress_def_levels_buffer: None,
- in_progress_rep_levels_buffer: None,
- before_consume: true,
- _parquet_type_marker: PhantomData,
- _converter_marker: PhantomData,
- })
- }
-
- fn next_column_reader(&mut self) -> Result<bool> {
- Ok(match self.pages.next() {
- Some(page) => {
- self.column_reader =
- Some(ColumnReaderImpl::<T>::new(self.column_desc.clone(), page?));
- true
- }
- None => false,
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
- use crate::basic::Encoding;
- use crate::column::page::Page;
- use crate::data_type::{ByteArray, ByteArrayType};
- use crate::schema::parser::parse_message_type;
- use crate::schema::types::SchemaDescriptor;
- use crate::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
- use arrow::array::StringArray;
- use rand::{thread_rng, Rng};
- use std::sync::Arc;
-
- #[test]
- fn test_complex_array_reader_no_pages() {
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL BYTE_ARRAY leaf (UTF8);
- }
- }
- ";
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
- let column_desc = schema.column(0);
- let pages: Vec<Vec<Page>> = Vec::new();
- let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
-
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- let mut array_reader =
- ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
- Box::new(page_iterator),
- column_desc,
- converter,
- None,
- )
- .unwrap();
-
- let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length
- let array = array_reader.next_batch(values_per_page).unwrap();
- assert_eq!(array.len(), 0);
- }
-
- #[test]
- fn test_complex_array_reader_def_and_rep_levels() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL BYTE_ARRAY leaf (UTF8);
- }
- }
- ";
- let num_pages = 2;
- let values_per_page = 100;
- let str_base = "Hello World";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
-
- let max_def_level = schema.column(0).max_def_level();
- let max_rep_level = schema.column(0).max_rep_level();
-
- assert_eq!(max_def_level, 2);
- assert_eq!(max_rep_level, 1);
-
- let mut rng = thread_rng();
- let column_desc = schema.column(0);
- let mut pages: Vec<Vec<Page>> = Vec::new();
-
- let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
- for i in 0..num_pages {
- let mut values = Vec::with_capacity(values_per_page);
-
- for _ in 0..values_per_page {
- let def_level = rng.gen_range(0..max_def_level + 1);
- let rep_level = rng.gen_range(0..max_rep_level + 1);
- if def_level == max_def_level {
- let len = rng.gen_range(1..str_base.len());
- let slice = &str_base[..len];
- values.push(ByteArray::from(slice));
- all_values.push(Some(slice.to_string()));
- } else {
- all_values.push(None)
- }
- rep_levels.push(rep_level);
- def_levels.push(def_level)
- }
-
- let range = i * values_per_page..(i + 1) * values_per_page;
- let mut pb =
- DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
-
- pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
- pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
- pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
-
- let data_page = pb.consume();
- pages.push(vec![data_page]);
- }
-
- let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
-
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- let mut array_reader =
- ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
- Box::new(page_iterator),
- column_desc,
- converter,
- None,
- )
- .unwrap();
-
- let mut accu_len: usize = 0;
-
- let len = array_reader.read_records(values_per_page / 2).unwrap();
- assert_eq!(len, values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- accu_len += len;
- array_reader.consume_batch().unwrap();
-
- // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
- // and the last values_per_page/2 ones are from the second column chunk
- let len = array_reader.read_records(values_per_page).unwrap();
- assert_eq!(len, values_per_page);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- let array = array_reader.consume_batch().unwrap();
- let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
- for i in 0..array.len() {
- if array.is_valid(i) {
- assert_eq!(
- all_values[i + accu_len].as_ref().unwrap().as_str(),
- strings.value(i)
- )
- } else {
- assert_eq!(all_values[i + accu_len], None)
- }
- }
- accu_len += len;
-
- // Try to read values_per_page values, however there are only values_per_page/2 values
- let len = array_reader.read_records(values_per_page).unwrap();
- assert_eq!(len, values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- array_reader.consume_batch().unwrap();
- }
-
- #[test]
- fn test_complex_array_reader_dict_enc_string() {
- use crate::encodings::encoding::{DictEncoder, Encoder};
- // Construct column schema
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL BYTE_ARRAY leaf (UTF8);
- }
- }
- ";
- let num_pages = 2;
- let values_per_page = 100;
- let str_base = "Hello World";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
- let column_desc = schema.column(0);
- let max_def_level = column_desc.max_def_level();
- let max_rep_level = column_desc.max_rep_level();
-
- assert_eq!(max_def_level, 2);
- assert_eq!(max_rep_level, 1);
-
- let mut rng = thread_rng();
- let mut pages: Vec<Vec<Page>> = Vec::new();
-
- let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
- for i in 0..num_pages {
- let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
- // add data page
- let mut values = Vec::with_capacity(values_per_page);
-
- for _ in 0..values_per_page {
- let def_level = rng.gen_range(0..max_def_level + 1);
- let rep_level = rng.gen_range(0..max_rep_level + 1);
- if def_level == max_def_level {
- let len = rng.gen_range(1..str_base.len());
- let slice = &str_base[..len];
- values.push(ByteArray::from(slice));
- all_values.push(Some(slice.to_string()));
- } else {
- all_values.push(None)
- }
- rep_levels.push(rep_level);
- def_levels.push(def_level)
- }
-
- let range = i * values_per_page..(i + 1) * values_per_page;
- let mut pb =
- DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
- pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
- pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
- let _ = dict_encoder.put(&values);
- let indices = dict_encoder
- .write_indices()
- .expect("write_indices() should be OK");
- pb.add_indices(indices);
- let data_page = pb.consume();
- // for each page log num_values vs actual values in page
- // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len());
- // add dictionary page
- let dict = dict_encoder
- .write_dict()
- .expect("write_dict() should be OK");
- let dict_page = Page::DictionaryPage {
- buf: dict,
- num_values: dict_encoder.num_entries() as u32,
- encoding: Encoding::RLE_DICTIONARY,
- is_sorted: false,
- };
- pages.push(vec![dict_page, data_page]);
- }
-
- let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- let mut array_reader =
- ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
- Box::new(page_iterator),
- column_desc,
- converter,
- None,
- )
- .unwrap();
-
- let mut accu_len: usize = 0;
-
- // println!("---------- reading a batch of {} values ----------", values_per_page / 2);
- let len = array_reader.read_records(values_per_page / 2).unwrap();
- assert_eq!(len, values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- accu_len += len;
- array_reader.consume_batch().unwrap();
-
- // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
- // and the last values_per_page/2 ones are from the second column chunk
- // println!("---------- reading a batch of {} values ----------", values_per_page);
- //let array = array_reader.next_batch(values_per_page).unwrap();
- let len = array_reader.read_records(values_per_page).unwrap();
- assert_eq!(len, values_per_page);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- let array = array_reader.consume_batch().unwrap();
- let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
- for i in 0..array.len() {
- if array.is_valid(i) {
- assert_eq!(
- all_values[i + accu_len].as_ref().unwrap().as_str(),
- strings.value(i)
- )
- } else {
- assert_eq!(all_values[i + accu_len], None)
- }
- }
- accu_len += len;
-
- // Try to read values_per_page values, however there are only values_per_page/2 values
- // println!("---------- reading a batch of {} values ----------", values_per_page);
- let len = array_reader.read_records(values_per_page).unwrap();
- assert_eq!(len, values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + len)]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + len)]),
- array_reader.get_rep_levels()
- );
- array_reader.consume_batch().unwrap();
- }
-}
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
new file mode 100644
index 000000000..ba3a02c4f
--- /dev/null
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
+use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ValuesBuffer};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{Encoding, Type};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
+use arrow::array::{
+ ArrayDataBuilder, ArrayRef, Decimal128Array, FixedSizeBinaryArray,
+ IntervalDayTimeArray, IntervalYearMonthArray,
+};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{DataType as ArrowType, IntervalUnit};
+use std::any::Any;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
+pub fn make_fixed_len_byte_array_reader(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ arrow_type: Option<ArrowType>,
+) -> Result<Box<dyn ArrayReader>> {
+ // Check if Arrow type is specified, else create it from Parquet type
+ let data_type = match arrow_type {
+ Some(t) => t,
+ None => parquet_to_arrow_field(column_desc.as_ref())?
+ .data_type()
+ .clone(),
+ };
+
+ let byte_length = match column_desc.physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
+ t => {
+ return Err(general_err!(
+ "invalid physical type for fixed length byte array reader - {}",
+ t
+ ))
+ }
+ };
+
+ match &data_type {
+ ArrowType::FixedSizeBinary(_) => {}
+ ArrowType::Decimal128(_, _) => {
+ if byte_length > 16 {
+ return Err(general_err!(
+ "decimal 128 type too large, must be less than 16 bytes, got {}",
+ byte_length
+ ));
+ }
+ }
+ ArrowType::Interval(_) => {
+ if byte_length != 12 {
+ // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
+ return Err(general_err!(
+ "interval type must consist of 12 bytes got {}",
+ byte_length
+ ));
+ }
+ }
+ _ => {
+ return Err(general_err!(
+ "invalid data type for fixed length byte array reader - {}",
+ data_type
+ ))
+ }
+ }
+
+ Ok(Box::new(FixedLenByteArrayReader::new(
+ pages,
+ column_desc,
+ data_type,
+ byte_length,
+ )))
+}
+
+struct FixedLenByteArrayReader {
+ data_type: ArrowType,
+ byte_length: usize,
+ pages: Box<dyn PageIterator>,
+ def_levels_buffer: Option<Buffer>,
+ rep_levels_buffer: Option<Buffer>,
+ record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
+}
+
+impl FixedLenByteArrayReader {
+ fn new(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ data_type: ArrowType,
+ byte_length: usize,
+ ) -> Self {
+ Self {
+ data_type,
+ byte_length,
+ pages,
+ def_levels_buffer: None,
+ rep_levels_buffer: None,
+ record_reader: GenericRecordReader::new_with_records(
+ column_desc,
+ FixedLenByteArrayBuffer {
+ buffer: Default::default(),
+ byte_length,
+ },
+ ),
+ }
+ }
+}
+
+impl ArrayReader for FixedLenByteArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+ read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
+ }
+
+ fn consume_batch(&mut self) -> Result<ArrayRef> {
+ let record_data = self.record_reader.consume_record_data();
+
+ let array_data =
+ ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
+ .len(self.record_reader.num_values())
+ .add_buffer(record_data)
+ .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
+
+ let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
+
+ // TODO: An improvement might be to do this conversion on read
+ let array = match &self.data_type {
+ ArrowType::Decimal128(p, s) => {
+ let decimal = binary
+ .iter()
+ .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
+ .collect::<Decimal128Array>()
+ .with_precision_and_scale(*p, *s)?;
+
+ Arc::new(decimal)
+ }
+ ArrowType::Interval(unit) => {
+ // An interval is stored as 3x 32-bit unsigned integers storing months, days,
+ // and milliseconds
+ match unit {
+ IntervalUnit::YearMonth => Arc::new(
+ binary
+ .iter()
+ .map(|o| {
+ o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))
+ })
+ .collect::<IntervalYearMonthArray>(),
+ ) as ArrayRef,
+ IntervalUnit::DayTime => Arc::new(
+ binary
+ .iter()
+ .map(|o| {
+ o.map(|b| {
+ i64::from_le_bytes(b[4..12].try_into().unwrap())
+ })
+ })
+ .collect::<IntervalDayTimeArray>(),
+ ) as ArrayRef,
+ IntervalUnit::MonthDayNano => {
+ return Err(nyi_err!("MonthDayNano intervals not supported"));
+ }
+ }
+ }
+ _ => Arc::new(binary) as ArrayRef,
+ };
+
+ self.def_levels_buffer = self.record_reader.consume_def_levels();
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels();
+ self.record_reader.reset();
+
+ Ok(array)
+ }
+
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ }
+}
+
+struct FixedLenByteArrayBuffer {
+ buffer: ScalarBuffer<u8>,
+ /// The length of each element in bytes
+ byte_length: usize,
+}
+
+impl ValuesBufferSlice for FixedLenByteArrayBuffer {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+}
+
+impl BufferQueue for FixedLenByteArrayBuffer {
+ type Output = Buffer;
+ type Slice = Self;
+
+ fn split_off(&mut self, len: usize) -> Self::Output {
+ self.buffer.split_off(len * self.byte_length)
+ }
+
+ fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ self
+ }
+
+ fn set_len(&mut self, len: usize) {
+ assert_eq!(self.buffer.len(), len * self.byte_length);
+ }
+}
+
+impl ValuesBuffer for FixedLenByteArrayBuffer {
+ fn pad_nulls(
+ &mut self,
+ read_offset: usize,
+ values_read: usize,
+ levels_read: usize,
+ valid_mask: &[u8],
+ ) {
+ assert_eq!(
+ self.buffer.len(),
+ (read_offset + values_read) * self.byte_length
+ );
+ self.buffer
+ .resize((read_offset + levels_read) * self.byte_length);
+
+ let slice = self.buffer.as_slice_mut();
+
+ let values_range = read_offset..read_offset + values_read;
+ for (value_pos, level_pos) in
+ values_range.rev().zip(iter_set_bits_rev(valid_mask))
+ {
+ debug_assert!(level_pos >= value_pos);
+ if level_pos <= value_pos {
+ break;
+ }
+
+ let level_pos_bytes = level_pos * self.byte_length;
+ let value_pos_bytes = value_pos * self.byte_length;
+
+ for i in 0..self.byte_length {
+ slice[level_pos_bytes + i] = slice[value_pos_bytes + i]
+ }
+ }
+ }
+}
+
+struct ValueDecoder {
+ byte_length: usize,
+ dict_page: Option<ByteBufferPtr>,
+ decoder: Option<Decoder>,
+}
+
+impl ColumnValueDecoder for ValueDecoder {
+ type Slice = FixedLenByteArrayBuffer;
+
+ fn new(col: &ColumnDescPtr) -> Self {
+ Self {
+ byte_length: col.type_length() as usize,
+ dict_page: None,
+ decoder: None,
+ }
+ }
+
+ fn set_dict(
+ &mut self,
+ buf: ByteBufferPtr,
+ num_values: u32,
+ encoding: Encoding,
+ _is_sorted: bool,
+ ) -> Result<()> {
+ if !matches!(
+ encoding,
+ Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
+ ) {
+ return Err(nyi_err!(
+ "Invalid/Unsupported encoding type for dictionary: {}",
+ encoding
+ ));
+ }
+ let expected_len = num_values as usize * self.byte_length;
+ if expected_len > buf.len() {
+ return Err(general_err!(
+ "too few bytes in dictionary page, expected {} got {}",
+ expected_len,
+ buf.len()
+ ));
+ }
+
+ self.dict_page = Some(buf);
+ Ok(())
+ }
+
+ fn set_data(
+ &mut self,
+ encoding: Encoding,
+ data: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ ) -> Result<()> {
+ self.decoder = Some(match encoding {
+ Encoding::PLAIN => Decoder::Plain {
+ buf: data,
+ offset: 0,
+ },
+ Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
+ decoder: DictIndexDecoder::new(data, num_levels, num_values),
+ },
+ Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
+ decoder: DeltaByteArrayDecoder::new(data)?,
+ },
+ _ => {
+ return Err(general_err!(
+ "unsupported encoding for fixed length byte array: {}",
+ encoding
+ ))
+ }
+ });
+ Ok(())
+ }
+
+ fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
+ assert_eq!(self.byte_length, out.byte_length);
+
+ let len = range.end - range.start;
+ match self.decoder.as_mut().unwrap() {
+ Decoder::Plain { offset, buf } => {
+ let to_read =
+ (len * self.byte_length).min(buf.len() - *offset) / self.byte_length;
+ let end_offset = *offset + to_read * self.byte_length;
+ out.buffer
+ .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
+ *offset = end_offset;
+ Ok(to_read)
+ }
+ Decoder::Dict { decoder } => {
+ let dict = self.dict_page.as_ref().unwrap();
+ // All data must be NULL
+ if dict.is_empty() {
+ return Ok(0);
+ }
+
+ decoder.read(len, |keys| {
+ out.buffer.reserve(keys.len() * self.byte_length);
+ for key in keys {
+ let offset = *key as usize * self.byte_length;
+ let val = &dict.as_ref()[offset..offset + self.byte_length];
+ out.buffer.extend_from_slice(val);
+ }
+ Ok(())
+ })
+ }
+ Decoder::Delta { decoder } => {
+ let to_read = len.min(decoder.remaining());
+ out.buffer.reserve(to_read * self.byte_length);
+
+ decoder.read(to_read, |slice| {
+ if slice.len() != self.byte_length {
+ return Err(general_err!(
+ "encountered array with incorrect length, got {} expected {}",
+ slice.len(),
+ self.byte_length
+ ));
+ }
+ out.buffer.extend_from_slice(slice);
+ Ok(())
+ })
+ }
+ }
+ }
+
+ fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+ match self.decoder.as_mut().unwrap() {
+ Decoder::Plain { offset, buf } => {
+ let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
+ *offset += to_read * self.byte_length;
+ Ok(to_read)
+ }
+ Decoder::Dict { decoder } => decoder.skip(num_values),
+ Decoder::Delta { decoder } => decoder.skip(num_values),
+ }
+ }
+}
+
+enum Decoder {
+ Plain { buf: ByteBufferPtr, offset: usize },
+ Dict { decoder: DictIndexDecoder },
+ Delta { decoder: DeltaByteArrayDecoder },
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+ use crate::arrow::ArrowWriter;
+ use arrow::array::{Array, Decimal128Array, ListArray};
+ use arrow::datatypes::Field;
+ use arrow::error::Result as ArrowResult;
+ use arrow::record_batch::RecordBatch;
+ use bytes::Bytes;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_decimal_list() {
+ let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
+
+ // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
+ let data = ArrayDataBuilder::new(ArrowType::List(Box::new(Field::new(
+ "item",
+ decimals.data_type().clone(),
+ false,
+ ))))
+ .len(7)
+ .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
+ .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
+ .child_data(vec![decimals.into_data()])
+ .build()
+ .unwrap();
+
+ let written = RecordBatch::try_from_iter([(
+ "list",
+ Arc::new(ListArray::from(data)) as ArrayRef,
+ )])
+ .unwrap();
+
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer =
+ ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
+ writer.write(&written).unwrap();
+ writer.close().unwrap();
+
+ let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+
+ assert_eq!(&written.slice(0, 3), &read[0]);
+ assert_eq!(&written.slice(3, 3), &read[1]);
+ assert_eq!(&written.slice(6, 1), &read[2]);
+ }
+}
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index 480b4d4df..3740f0fae 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -33,8 +33,8 @@ use crate::schema::types::SchemaDescPtr;
mod builder;
mod byte_array;
mod byte_array_dictionary;
-mod complex_object_array;
mod empty_array;
+mod fixed_len_byte_array;
mod list_array;
mod map_array;
mod null_array;
@@ -47,7 +47,7 @@ mod test_util;
pub use builder::build_array_reader;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
-pub use complex_object_array::ComplexObjectArrayReader;
+pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
pub use list_array::ListArrayReader;
pub use map_array::MapArrayReader;
pub use null_array::NullArrayReader;
@@ -181,7 +181,7 @@ fn read_records<V, CV>(
batch_size: usize,
) -> Result<usize>
where
- V: ValuesBuffer + Default,
+ V: ValuesBuffer,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
let mut records_read = 0usize;
@@ -215,7 +215,7 @@ fn skip_records<V, CV>(
batch_size: usize,
) -> Result<usize>
where
- V: ValuesBuffer + Default,
+ V: ValuesBuffer,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
let mut records_skipped = 0usize;
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index c1ee7a474..76e247ae1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -627,9 +627,6 @@ mod tests {
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
- use crate::arrow::buffer::converter::{
- Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
- };
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
@@ -836,24 +833,41 @@ mod tests {
#[test]
fn test_fixed_length_binary_column_reader() {
- let converter = FixedSizeArrayConverter::new(20);
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
20,
ConvertedType::NONE,
None,
- |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
+ |vals| {
+ let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
+ for val in vals {
+ match val {
+ Some(b) => builder.append_value(b).unwrap(),
+ None => builder.append_null(),
+ }
+ }
+ Arc::new(builder.finish())
+ },
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}
#[test]
fn test_interval_day_time_column_reader() {
- let converter = IntervalDayTimeArrayConverter {};
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
12,
ConvertedType::INTERVAL,
None,
- |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
+ |vals| {
+ Arc::new(
+ vals.iter()
+ .map(|x| {
+ x.as_ref().map(|b| {
+ i64::from_le_bytes(b.as_ref()[4..12].try_into().unwrap())
+ })
+ })
+ .collect::<IntervalDayTimeArray>(),
+ )
+ },
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}
diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs
deleted file mode 100644
index 3db0be4e9..000000000
--- a/parquet/src/arrow/buffer/converter.rs
+++ /dev/null
@@ -1,238 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::data_type::FixedLenByteArray;
-use arrow::array::{
- Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
- IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
- IntervalYearMonthBuilder,
-};
-use std::sync::Arc;
-
-use crate::errors::Result;
-use std::marker::PhantomData;
-
-use crate::arrow::buffer::bit_util::sign_extend_be;
-#[cfg(test)]
-use crate::data_type::ByteArray;
-
-#[cfg(test)]
-use arrow::array::{StringArray, StringBuilder};
-
-/// A converter is used to consume record reader's content and convert it to arrow
-/// primitive array.
-pub trait Converter<S, T> {
- /// This method converts record reader's buffered content into arrow array.
- /// It will consume record reader's data, but will not reset record reader's
- /// state.
- fn convert(&self, source: S) -> Result<T>;
-}
-
-pub struct FixedSizeArrayConverter {
- byte_width: i32,
-}
-
-impl FixedSizeArrayConverter {
- pub fn new(byte_width: i32) -> Self {
- Self { byte_width }
- }
-}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, FixedSizeBinaryArray>
- for FixedSizeArrayConverter
-{
- fn convert(
- &self,
- source: Vec<Option<FixedLenByteArray>>,
- ) -> Result<FixedSizeBinaryArray> {
- let mut builder =
- FixedSizeBinaryBuilder::with_capacity(source.len(), self.byte_width);
- for v in source {
- match v {
- Some(array) => builder.append_value(array.data())?,
- None => builder.append_null(),
- }
- }
-
- Ok(builder.finish())
- }
-}
-
-pub struct DecimalArrayConverter {
- precision: u8,
- scale: u8,
-}
-
-impl DecimalArrayConverter {
- pub fn new(precision: u8, scale: u8) -> Self {
- Self { precision, scale }
- }
-}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
- for DecimalArrayConverter
-{
- fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<Decimal128Array> {
- let array = source
- .into_iter()
- .map(|array| array.map(|array| from_bytes_to_i128(array.data())))
- .collect::<Decimal128Array>()
- .with_precision_and_scale(self.precision, self.scale)?;
-
- Ok(array)
- }
-}
-
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
- // The bytes array are from parquet file and must be the big-endian.
- // The endian is defined by parquet format, and the reference document
- // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
- i128::from_be_bytes(sign_extend_be(b))
-}
-
-/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
-/// and interprets it as an i32 value representing the Arrow YearMonth value
-pub struct IntervalYearMonthArrayConverter {}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, IntervalYearMonthArray>
- for IntervalYearMonthArrayConverter
-{
- fn convert(
- &self,
- source: Vec<Option<FixedLenByteArray>>,
- ) -> Result<IntervalYearMonthArray> {
- let mut builder = IntervalYearMonthBuilder::with_capacity(source.len());
- for v in source {
- match v {
- Some(array) => builder.append_value(i32::from_le_bytes(
- array.data()[0..4].try_into().unwrap(),
- )),
- None => builder.append_null(),
- }
- }
-
- Ok(builder.finish())
- }
-}
-
-/// An Arrow Interval converter, which reads the last 8 bytes of a Parquet interval,
-/// and interprets it as an i32 value representing the Arrow DayTime value
-pub struct IntervalDayTimeArrayConverter {}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, IntervalDayTimeArray>
- for IntervalDayTimeArrayConverter
-{
- fn convert(
- &self,
- source: Vec<Option<FixedLenByteArray>>,
- ) -> Result<IntervalDayTimeArray> {
- let mut builder = IntervalDayTimeBuilder::with_capacity(source.len());
- for v in source {
- match v {
- Some(array) => builder.append_value(i64::from_le_bytes(
- array.data()[4..12].try_into().unwrap(),
- )),
- None => builder.append_null(),
- }
- }
-
- Ok(builder.finish())
- }
-}
-
-#[cfg(test)]
-pub struct Utf8ArrayConverter {}
-
-#[cfg(test)]
-impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
- fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<StringArray> {
- let data_size = source
- .iter()
- .map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
- .sum();
-
- let mut builder = StringBuilder::with_capacity(source.len(), data_size);
- for v in source {
- match v {
- Some(array) => builder.append_value(array.as_utf8()?),
- None => builder.append_null(),
- }
- }
-
- Ok(builder.finish())
- }
-}
-
-#[cfg(test)]
-pub type Utf8Converter =
- ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
-
-pub type FixedLenBinaryConverter = ArrayRefConverter<
- Vec<Option<FixedLenByteArray>>,
- FixedSizeBinaryArray,
- FixedSizeArrayConverter,
->;
-pub type IntervalYearMonthConverter = ArrayRefConverter<
- Vec<Option<FixedLenByteArray>>,
- IntervalYearMonthArray,
- IntervalYearMonthArrayConverter,
->;
-pub type IntervalDayTimeConverter = ArrayRefConverter<
- Vec<Option<FixedLenByteArray>>,
- IntervalDayTimeArray,
- IntervalDayTimeArrayConverter,
->;
-
-pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
- Vec<Option<FixedLenByteArray>>,
- Decimal128Array,
- DecimalArrayConverter,
->;
-
-pub struct ArrayRefConverter<S, A, C> {
- _source: PhantomData<S>,
- _array: PhantomData<A>,
- converter: C,
-}
-
-impl<S, A, C> ArrayRefConverter<S, A, C>
-where
- A: Array + 'static,
- C: Converter<S, A> + 'static,
-{
- pub fn new(converter: C) -> Self {
- Self {
- _source: PhantomData,
- _array: PhantomData,
- converter,
- }
- }
-}
-
-impl<S, A, C> Converter<S, ArrayRef> for ArrayRefConverter<S, A, C>
-where
- A: Array + 'static,
- C: Converter<S, A> + 'static,
-{
- fn convert(&self, source: S) -> Result<ArrayRef> {
- self.converter
- .convert(source)
- .map(|array| Arc::new(array) as ArrayRef)
- }
-}
diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs
index 5ee89aa1a..cbc795d94 100644
--- a/parquet/src/arrow/buffer/mod.rs
+++ b/parquet/src/arrow/buffer/mod.rs
@@ -18,6 +18,5 @@
//! Logic for reading data into arrow buffers
pub mod bit_util;
-pub mod converter;
pub mod dictionary_buffer;
pub mod offset_buffer;
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index 18b4c9e07..6c1c61039 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -78,13 +78,23 @@ where
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
+ Self::new_with_records(desc, V::default())
+ }
+}
+
+impl<V, CV> GenericRecordReader<V, CV>
+where
+ V: ValuesBuffer,
+ CV: ColumnValueDecoder<Slice = V::Slice>,
+{
+ pub fn new_with_records(desc: ColumnDescPtr, records: V) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
Self {
- records: Default::default(),
+ records,
def_levels,
rep_levels,
column_reader: None,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 35fec60a0..440160039 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -1208,6 +1208,18 @@ make_type!(
mem::size_of::<FixedLenByteArray>()
);
+impl AsRef<[u8]> for ByteArray {
+ fn as_ref(&self) -> &[u8] {
+ self.as_bytes()
+ }
+}
+
+impl AsRef<[u8]> for FixedLenByteArray {
+ fn as_ref(&self) -> &[u8] {
+ self.as_bytes()
+ }
+}
+
impl FromBytes for Int96 {
type Buffer = [u8; 12];
fn from_le_bytes(bs: Self::Buffer) -> Self {