You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/13 17:36:04 UTC
[arrow-rs] branch master updated: Fix Parquet Arrow Schema Inference (#1682)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5b154ea40 Fix Parquet Arrow Schema Inference (#1682)
5b154ea40 is described below
commit 5b154ea40314dc2f09babbb363bf7f1fe439d4eb
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri May 13 18:35:56 2022 +0100
Fix Parquet Arrow Schema Inference (#1682)
* Separate parquet -> arrow conversion logic (#1655)
Don't treat embedded arrow schema as authoritative (#1663)
Fix projection of nested parquet files (#1652) (#1654)
Fix schema inference for repeated fields (#1681)
Support reading alternative list representations from parquet (#1680)
* Add more tests
* Pass pointers by reference
* More docs
* Fix lint
* Review feedback
* Review feedback
* Fix test failures related to #1697
---
parquet/src/arrow/array_reader/builder.rs | 859 ++++++++-------------------
parquet/src/arrow/array_reader/list_array.rs | 13 +-
parquet/src/arrow/arrow_reader.rs | 36 ++
parquet/src/arrow/schema.rs | 725 ++++------------------
parquet/src/arrow/schema/complex.rs | 599 +++++++++++++++++++
parquet/src/arrow/schema/primitive.rs | 266 +++++++++
parquet/src/errors.rs | 8 +
7 files changed, 1279 insertions(+), 1227 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index af2896350..ab5800672 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};
+use arrow::datatypes::{DataType, IntervalUnit, SchemaRef};
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::{
@@ -32,15 +31,14 @@ use crate::arrow::converter::{
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
-use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
+use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
+use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
Int96Type,
};
-use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
-use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
-use crate::schema::visitor::TypeVisitor;
+use crate::errors::Result;
+use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
@@ -52,435 +50,217 @@ pub fn build_array_reader<T>(
where
T: IntoIterator<Item = usize>,
{
- let mut leaves = HashMap::<*const Type, usize>::new();
-
- let mut filtered_root_names = HashSet::<String>::new();
-
- for c in column_indices {
- let column = parquet_schema.column(c).self_type() as *const Type;
-
- leaves.insert(column, c);
-
- let root = parquet_schema.get_column_root_ptr(c);
- filtered_root_names.insert(root.name().to_string());
+ let field = convert_schema(
+ parquet_schema.as_ref(),
+ column_indices,
+ Some(arrow_schema.as_ref()),
+ )?;
+
+ match &field {
+ Some(field) => build_reader(field, row_groups.as_ref()),
+ None => Ok(make_empty_array_reader(row_groups.num_rows())),
}
-
- // Only pass root fields that take part in the projection
- // to avoid traversal of columns that are not read.
- // TODO: also prune unread parts of the tree in child structures
- let filtered_root_fields = parquet_schema
- .root_schema()
- .get_fields()
- .iter()
- .filter(|field| filtered_root_names.contains(field.name()))
- .cloned()
- .collect::<Vec<_>>();
-
- let proj = Type::GroupType {
- basic_info: parquet_schema.root_schema().get_basic_info().clone(),
- fields: filtered_root_fields,
- };
-
- ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
- .build_array_reader()
}
-/// Used to build array reader.
-struct ArrayReaderBuilder {
- root_schema: TypePtr,
- arrow_schema: Arc<Schema>,
- // Key: columns that need to be included in final array builder
- // Value: column index in schema
- columns_included: Arc<HashMap<*const Type, usize>>,
- row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+ field: &ParquetField,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+ match field.field_type {
+ ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+ ParquetFieldType::Group { .. } => match &field.arrow_type {
+ DataType::Map(_, _) => build_map_reader(field, row_groups),
+ DataType::Struct(_) => build_struct_reader(field, row_groups),
+ DataType::List(_) => build_list_reader(field, false, row_groups),
+ DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+ d => unimplemented!("reading group type {} not implemented", d),
+ },
+ }
}
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
- def_level: i16,
- rep_level: i16,
- path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+ field: &ParquetField,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 2);
+
+ let key_reader = build_reader(&children[0], row_groups)?;
+ let value_reader = build_reader(&children[1], row_groups)?;
+
+ Ok(Box::new(MapArrayReader::new(
+ key_reader,
+ value_reader,
+ field.arrow_type.clone(),
+ field.def_level,
+ field.rep_level,
+ )))
}
-impl Default for ArrayReaderBuilderContext {
- fn default() -> Self {
- Self {
- def_level: 0i16,
- rep_level: 0i16,
- path: ColumnPath::new(Vec::new()),
- }
+/// Build array reader for list type.
+fn build_list_reader(
+ field: &ParquetField,
+ is_large: bool,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 1);
+
+ let data_type = field.arrow_type.clone();
+ let item_reader = build_reader(&children[0], row_groups)?;
+ let item_type = item_reader.get_data_type().clone();
+
+ match is_large {
+ false => Ok(Box::new(ListArrayReader::<i32>::new(
+ item_reader,
+ data_type,
+ item_type,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ )) as _),
+ true => Ok(Box::new(ListArrayReader::<i64>::new(
+ item_reader,
+ data_type,
+ item_type,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ )) as _),
}
}
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
- for ArrayReaderBuilder
-{
- /// Build array reader for primitive type.
- fn visit_primitive(
- &mut self,
- cur_type: TypePtr,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- if self.is_included(cur_type.as_ref()) {
- let mut new_context = context.clone();
- new_context.path.append(vec![cur_type.name().to_string()]);
-
- let null_mask_only = match cur_type.get_basic_info().repetition() {
- Repetition::REPEATED => {
- return Err(ArrowError(format!(
- "Reading repeated primitive ({:?}) is not supported yet!",
- cur_type.name()
- )));
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
-
- // Can just compute null mask if no parent
- context.def_level == 0 && context.rep_level == 0
- }
- _ => false,
- };
-
- let reader = self.build_for_primitive_type_inner(
- cur_type,
- &new_context,
- null_mask_only,
- )?;
-
- Ok(Some(reader))
- } else {
- Ok(None)
- }
- }
-
- /// Build array reader for struct type.
- fn visit_struct(
- &mut self,
- cur_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut new_context = context.clone();
- new_context.path.append(vec![cur_type.name().to_string()]);
-
- if cur_type.get_basic_info().has_repetition() {
- match cur_type.get_basic_info().repetition() {
- Repetition::REPEATED => {
- return Err(ArrowError(format!(
- "Reading repeated struct ({:?}) is not supported yet!",
- cur_type.name(),
- )))
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- }
- Repetition::REQUIRED => {}
- }
- }
-
- self.build_for_struct_type_inner(&cur_type, &new_context)
- }
-
- /// Build array reader for map type.
- fn visit_map(
- &mut self,
- map_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- // Add map type to context
- let mut new_context = context.clone();
- new_context.path.append(vec![map_type.name().to_string()]);
-
- match map_type.get_basic_info().repetition() {
- Repetition::REQUIRED => {}
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- }
- Repetition::REPEATED => {
- return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+ field: &ParquetField,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+ let (col_idx, primitive_type, type_len) = match &field.field_type {
+ ParquetFieldType::Primitive {
+ col_idx,
+ primitive_type,
+ } => match primitive_type.as_ref() {
+ Type::PrimitiveType { type_length, .. } => {
+ (*col_idx, primitive_type.clone(), *type_length)
}
- }
-
- if map_type.get_fields().len() != 1 {
- return Err(ArrowError(format!(
- "Map field must have exactly one key_value child, found {}",
- map_type.get_fields().len()
- )));
- }
-
- // Add map entry (key_value) to context
- let map_key_value = &map_type.get_fields()[0];
- if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
- return Err(ArrowError(
- "Child of map field must be repeated".to_string(),
- ));
- }
-
- new_context
- .path
- .append(vec![map_key_value.name().to_string()]);
-
- new_context.rep_level += 1;
- new_context.def_level += 1;
-
- if map_key_value.get_fields().len() != 2 {
- // According to the specification the values are optional (#1642)
- return Err(ArrowError(format!(
- "Child of map field must have two children, found {}",
- map_key_value.get_fields().len()
- )));
- }
-
- // Get key and value, and create context for each
- let map_key = &map_key_value.get_fields()[0];
- let map_value = &map_key_value.get_fields()[1];
-
- if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
- return Err(ArrowError("Map keys must be required".to_string()));
- }
-
- if map_value.get_basic_info().repetition() == Repetition::REPEATED {
- return Err(ArrowError("Map values cannot be repeated".to_string()));
- }
-
- let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
- let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
- let arrow_type = self
- .arrow_schema
- .field_with_name(map_type.name())
- .ok()
- .map(|f| f.data_type().to_owned())
- .unwrap_or_else(|| {
- ArrowType::Map(
- Box::new(Field::new(
- map_key_value.name(),
- ArrowType::Struct(vec![
- Field::new(
- map_key.name(),
- key_reader.get_data_type().clone(),
- false,
- ),
- Field::new(
- map_value.name(),
- value_reader.get_data_type().clone(),
- map_value.is_optional(),
- ),
- ]),
- map_type.is_optional(),
- )),
- false,
- )
- });
-
- let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
- key_reader,
- value_reader,
- arrow_type,
- new_context.def_level,
- new_context.rep_level,
- ));
-
- Ok(Some(key_array_reader))
- }
-
- /// Build array reader for list type.
- fn visit_list_with_item(
- &mut self,
- list_type: Arc<Type>,
- item_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut new_context = context.clone();
- new_context.path.append(vec![list_type.name().to_string()]);
-
- // If the list is nullable
- let nullable = match list_type.get_basic_info().repetition() {
- Repetition::REQUIRED => false,
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- true
- }
- Repetition::REPEATED => {
- return Err(general_err!("List type cannot be repeated"))
- }
- };
-
- if list_type.get_fields().len() != 1 {
- return Err(ArrowError(format!(
- "List field must have exactly one child, found {}",
- list_type.get_fields().len()
- )));
- }
- let mut list_child = &list_type.get_fields()[0];
-
- if list_child.get_basic_info().repetition() != Repetition::REPEATED {
- return Err(ArrowError("List child must be repeated".to_string()));
- }
-
- // The repeated field
- new_context.rep_level += 1;
- new_context.def_level += 1;
-
- match self.dispatch(item_type, &new_context) {
- Ok(Some(item_reader)) => {
- let item_type = item_reader.get_data_type().clone();
-
- // a list is a group type with a single child. The list child's
- // name comes from the child's field name.
- // if the child's name is "list" and it has a child, then use this child
- if list_child.name() == "list" && !list_child.get_fields().is_empty() {
- list_child = list_child.get_fields().first().unwrap();
- }
-
- let arrow_type = self
- .arrow_schema
- .field_with_name(list_type.name())
- .ok()
- .map(|f| f.data_type().to_owned())
- .unwrap_or_else(|| {
- ArrowType::List(Box::new(Field::new(
- list_child.name(),
- item_type.clone(),
- list_child.is_optional(),
- )))
- });
-
- let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
- ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
- item_reader,
- arrow_type,
- item_type,
- new_context.def_level,
- new_context.rep_level,
- nullable,
- )),
- ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
- item_reader,
- arrow_type,
- item_type,
- new_context.def_level,
- new_context.rep_level,
- nullable,
- )),
- _ => {
- return Err(ArrowError(format!(
- "creating ListArrayReader with type {:?} should be unreachable",
- arrow_type
- )))
- }
- };
-
- Ok(Some(list_array_reader))
- }
- result => result,
- }
- }
-}
-
-impl<'a> ArrayReaderBuilder {
- /// Construct array reader builder.
- fn new(
- root_schema: TypePtr,
- arrow_schema: Arc<Schema>,
- columns_included: Arc<HashMap<*const Type, usize>>,
- file_reader: Box<dyn RowGroupCollection>,
- ) -> Self {
- Self {
- root_schema,
- arrow_schema,
- columns_included,
- row_groups: file_reader,
- }
- }
-
- /// Main entry point.
- fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
- let context = ArrayReaderBuilderContext::default();
-
- match self.visit_struct(self.root_schema.clone(), &context)? {
- Some(reader) => Ok(reader),
- None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
- }
- }
-
- // Utility functions
-
- /// Check whether one column in included in this array reader builder.
- fn is_included(&self, t: &Type) -> bool {
- self.columns_included.contains_key(&(t as *const Type))
- }
-
- /// Creates primitive array reader for each primitive type.
- fn build_for_primitive_type_inner(
- &self,
- cur_type: TypePtr,
- context: &'a ArrayReaderBuilderContext,
- null_mask_only: bool,
- ) -> Result<Box<dyn ArrayReader>> {
- let column_desc = Arc::new(ColumnDescriptor::new(
- cur_type.clone(),
- context.def_level,
- context.rep_level,
- context.path.clone(),
- ));
-
- let page_iterator = self
- .row_groups
- .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
-
- let arrow_type: Option<ArrowType> = self
- .get_arrow_field(&cur_type, context)
- .map(|f| f.data_type().clone());
+ Type::GroupType { .. } => unreachable!(),
+ },
+ _ => unreachable!(),
+ };
- match cur_type.get_physical_type() {
- PhysicalType::BOOLEAN => Ok(Box::new(
- PrimitiveArrayReader::<BoolType>::new_with_options(
+ let physical_type = primitive_type.get_physical_type();
+
+ // We don't track the column path in ParquetField as it adds a potential source
+ // of bugs when the arrow mapping converts more than one level in the parquet
+ // schema into a single arrow field.
+ //
+ // None of the readers actually use this field, but it is required for this type,
+ // so just stick a placeholder in
+ let column_desc = Arc::new(ColumnDescriptor::new(
+ primitive_type,
+ field.def_level,
+ field.rep_level,
+ ColumnPath::new(vec![]),
+ ));
+
+ let page_iterator = row_groups.column_chunks(col_idx)?;
+ let null_mask_only = field.def_level == 1 && field.nullable;
+ let arrow_type = Some(field.arrow_type.clone());
+
+ match physical_type {
+ PhysicalType::BOOLEAN => Ok(Box::new(
+ PrimitiveArrayReader::<BoolType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::INT32 => {
+ if let Some(DataType::Null) = arrow_type {
+ Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::INT32 => {
- if let Some(ArrowType::Null) = arrow_type {
- Ok(Box::new(NullArrayReader::<Int32Type>::new(
+ )?))
+ } else {
+ Ok(Box::new(
+ PrimitiveArrayReader::<Int32Type>::new_with_options(
page_iterator,
column_desc,
- )?))
+ arrow_type,
+ null_mask_only,
+ )?,
+ ))
+ }
+ }
+ PhysicalType::INT64 => Ok(Box::new(
+ PrimitiveArrayReader::<Int64Type>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::INT96 => {
+ // get the optional timezone information from arrow type
+ let timezone = arrow_type.as_ref().and_then(|data_type| {
+ if let DataType::Timestamp(_, tz) = data_type {
+ tz.clone()
} else {
- Ok(Box::new(
- PrimitiveArrayReader::<Int32Type>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- ))
+ None
}
- }
- PhysicalType::INT64 => Ok(Box::new(
- PrimitiveArrayReader::<Int64Type>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::INT96 => {
- // get the optional timezone information from arrow type
- let timezone = arrow_type.as_ref().and_then(|data_type| {
- if let ArrowType::Timestamp(_, tz) = data_type {
- tz.clone()
- } else {
- None
- }
- });
- let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+ });
+ let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+ Ok(Box::new(ComplexObjectArrayReader::<
+ Int96Type,
+ Int96Converter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ PhysicalType::FLOAT => Ok(Box::new(
+ PrimitiveArrayReader::<FloatType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::DOUBLE => Ok(Box::new(
+ PrimitiveArrayReader::<DoubleType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::BYTE_ARRAY => match arrow_type {
+ Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
+ _ => make_byte_array_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
+ },
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
+ DataType::Decimal(precision, scale) => {
+ let converter = DecimalConverter::new(DecimalArrayConverter::new(
+ precision as i32,
+ scale as i32,
+ ));
Ok(Box::new(ComplexObjectArrayReader::<
- Int96Type,
- Int96Converter,
+ FixedLenByteArrayType,
+ DecimalConverter,
>::new(
page_iterator,
column_desc,
@@ -488,47 +268,38 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
- PhysicalType::FLOAT => Ok(Box::new(
- PrimitiveArrayReader::<FloatType>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::DOUBLE => Ok(Box::new(
- PrimitiveArrayReader::<DoubleType>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::BYTE_ARRAY => match arrow_type {
- Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+ DataType::Interval(IntervalUnit::DayTime) => {
+ let converter =
+ IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ _,
+ >::new(
page_iterator,
column_desc,
+ converter,
arrow_type,
- null_mask_only,
- ),
- _ => make_byte_array_reader(
+ )?))
+ }
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ let converter =
+ IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ _,
+ >::new(
page_iterator,
column_desc,
+ converter,
arrow_type,
- null_mask_only,
- ),
- },
- PhysicalType::FIXED_LEN_BYTE_ARRAY
- if cur_type.get_basic_info().converted_type()
- == ConvertedType::DECIMAL =>
- {
- let converter = DecimalConverter::new(DecimalArrayConverter::new(
- cur_type.get_precision(),
- cur_type.get_scale(),
- ));
+ )?))
+ }
+ _ => {
+ let converter =
+ FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
- DecimalConverter,
+ FixedLenBinaryConverter,
>::new(
page_iterator,
column_desc,
@@ -536,173 +307,26 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
- PhysicalType::FIXED_LEN_BYTE_ARRAY => {
- let byte_width = match *cur_type {
- Type::PrimitiveType {
- ref type_length, ..
- } => *type_length,
- _ => {
- return Err(ArrowError(
- "Expected a physical type, not a group type".to_string(),
- ))
- }
- };
- if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL {
- if byte_width != 12 {
- return Err(ArrowError(format!(
- "Parquet interval type should have length of 12, found {}",
- byte_width
- )));
- }
- match arrow_type {
- Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
- let converter = IntervalDayTimeConverter::new(
- IntervalDayTimeArrayConverter {},
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- _,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
- let converter = IntervalYearMonthConverter::new(
- IntervalYearMonthArrayConverter {},
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- _,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- Some(t) => Err(ArrowError(format!(
- "Cannot write a Parquet interval to {:?}",
- t
- ))),
- None => {
- // we do not support an interval not matched to an Arrow type,
- // because we risk data loss as we won't know which of the 12 bytes
- // are or should be populated
- Err(ArrowError(
- "Cannot write a Parquet interval with no Arrow type specified.
- There is a risk of data loss as Arrow either supports YearMonth or
- DayTime precision. Without the Arrow type, we cannot infer the type.
- ".to_string()
- ))
- }
- }
- } else {
- let converter = FixedLenBinaryConverter::new(
- FixedSizeArrayConverter::new(byte_width),
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- FixedLenBinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- }
- }
- }
-
- /// Constructs struct array reader without considering repetition.
- fn build_for_struct_type_inner(
- &mut self,
- cur_type: &Type,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut fields = Vec::with_capacity(cur_type.get_fields().len());
- let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
-
- for child in cur_type.get_fields() {
- if let Some(child_reader) = self.dispatch(child.clone(), context)? {
- // TODO: this results in calling get_arrow_field twice, it could be reused
- // from child_reader above, by making child_reader carry its `Field`
- let mut struct_context = context.clone();
- struct_context.path.append(vec![child.name().to_string()]);
- let field = match self.get_arrow_field(child, &struct_context) {
- Some(f) => f.clone(),
- _ => Field::new(
- child.name(),
- child_reader.get_data_type().clone(),
- child.is_optional(),
- ),
- };
- fields.push(field);
- children_reader.push(child_reader);
- }
- }
-
- if !fields.is_empty() {
- let arrow_type = ArrowType::Struct(fields);
- Ok(Some(Box::new(StructArrayReader::new(
- arrow_type,
- children_reader,
- context.def_level,
- context.rep_level,
- ))))
- } else {
- Ok(None)
- }
+ },
}
+}
- fn get_arrow_field(
- &self,
- cur_type: &Type,
- context: &'a ArrayReaderBuilderContext,
- ) -> Option<&Field> {
- let parts: Vec<&str> = context
- .path
- .parts()
- .iter()
- .map(|x| -> &str { x })
- .collect::<Vec<&str>>();
-
- // If the parts length is one it'll have the top level "schema" type. If
- // it's two then it'll be a top-level type that we can get from the arrow
- // schema directly.
- if parts.len() <= 2 {
- self.arrow_schema.field_with_name(cur_type.name()).ok()
- } else {
- // If it's greater than two then we need to traverse the type path
- // until we find the actual field we're looking for.
- let mut field: Option<&Field> = None;
-
- for (i, part) in parts.iter().enumerate().skip(1) {
- if i == 1 {
- field = self.arrow_schema.field_with_name(part).ok();
- } else if let Some(f) = field {
- match f.data_type() {
- ArrowType::Struct(fields) => {
- field = fields.iter().find(|f| f.name() == part)
- }
- ArrowType::List(list_field) => match list_field.data_type() {
- ArrowType::Struct(fields) => {
- field = fields.iter().find(|f| f.name() == part)
- }
- _ => field = Some(list_field.as_ref()),
- },
- _ => field = None,
- }
- } else {
- field = None;
- }
- }
- field
- }
- }
+fn build_struct_reader(
+ field: &ParquetField,
+ row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+ let children = field.children().unwrap();
+ let children_reader = children
+ .iter()
+ .map(|child| build_reader(child, row_groups))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Box::new(StructArrayReader::new(
+ field.arrow_type.clone(),
+ children_reader,
+ field.def_level,
+ field.rep_level,
+ )) as _)
}
#[cfg(test)]
@@ -711,6 +335,7 @@ mod tests {
use crate::arrow::parquet_to_arrow_schema;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::util::test_common::get_test_file;
+ use arrow::datatypes::Field;
use std::sync::Arc;
#[test]
@@ -735,9 +360,9 @@ mod tests {
.unwrap();
// Create arrow types
- let arrow_type = ArrowType::Struct(vec![Field::new(
+ let arrow_type = DataType::Struct(vec![Field::new(
"b_struct",
- ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]),
+ DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)]),
true,
)]);
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 5b7d4865f..31796d79b 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -584,16 +584,25 @@ mod tests {
let mut array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
- Arc::new(arrow_schema.clone()),
+ Arc::new(arrow_schema),
vec![0usize].into_iter(),
Box::new(file_reader),
)
.unwrap();
let batch = array_reader.next_batch(100).unwrap();
+ assert_eq!(batch.data_type(), array_reader.get_data_type());
assert_eq!(
batch.data_type(),
- &ArrowType::Struct(arrow_schema.fields().clone())
+ &ArrowType::Struct(vec![Field::new(
+ "table_info",
+ ArrowType::List(Box::new(Field::new(
+ "table_info",
+ ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)]),
+ false
+ ))),
+ false
+ )])
);
assert_eq!(batch.len(), 0);
}
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 7675707e3..0c4ded90b 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -1050,6 +1050,42 @@ mod tests {
for batch in record_batch_reader {
batch.unwrap();
}
+
+ let projected_reader = arrow_reader
+ .get_record_reader_by_columns(vec![3, 8, 10], 60)
+ .unwrap();
+ let projected_schema = arrow_reader
+ .get_schema_by_columns(vec![3, 8, 10], true)
+ .unwrap();
+
+ let expected_schema = Schema::new(vec![
+ Field::new(
+ "roll_num",
+ ArrowDataType::Struct(vec![Field::new(
+ "count",
+ ArrowDataType::UInt64,
+ false,
+ )]),
+ false,
+ ),
+ Field::new(
+ "PC_CUR",
+ ArrowDataType::Struct(vec![
+ Field::new("mean", ArrowDataType::Int64, false),
+ Field::new("sum", ArrowDataType::Int64, false),
+ ]),
+ false,
+ ),
+ ]);
+
+ // Tests for #1652 and #1654
+ assert_eq!(projected_reader.schema().as_ref(), &projected_schema);
+ assert_eq!(expected_schema, projected_schema);
+
+ for batch in projected_reader {
+ let batch = batch.unwrap();
+ assert_eq!(batch.schema().as_ref(), &projected_schema);
+ }
}
#[test]
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index bfccfe7f9..71184e0b6 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -23,22 +23,24 @@
//!
//! The interfaces for converting arrow schema to parquet schema is coming.
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
use std::sync::Arc;
-use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
+use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::ipc::writer;
+use crate::basic::{
+ ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit,
+ Type as PhysicalType,
+};
use crate::errors::{ParquetError::ArrowError, Result};
use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
-use crate::{
- basic::{
- ConvertedType, LogicalType, Repetition,
- TimeUnit as ParquetTimeUnit, Type as PhysicalType,
- },
- errors::ParquetError,
-};
+
+mod complex;
+mod primitive;
+
+pub(crate) use complex::{convert_schema, ParquetField, ParquetFieldType};
/// Convert Parquet schema to Arrow schema including optional metadata.
/// Attempts to decode any existing Arrow schema metadata, falling back
@@ -47,15 +49,11 @@ pub fn parquet_to_arrow_schema(
parquet_schema: &SchemaDescriptor,
key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<Schema> {
- let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
- metadata
- .remove(super::ARROW_SCHEMA_META_KEY)
- .map(|encoded| get_arrow_schema_from_metadata(&encoded))
- .unwrap_or(parquet_to_arrow_schema_by_columns(
- parquet_schema,
- 0..parquet_schema.columns().len(),
- key_value_metadata,
- ))
+ parquet_to_arrow_schema_by_columns(
+ parquet_schema,
+ 0..parquet_schema.columns().len(),
+ key_value_metadata,
+ )
}
/// Convert parquet schema to arrow schema including optional metadata,
@@ -122,58 +120,25 @@ where
T: IntoIterator<Item = usize>,
{
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
- let arrow_schema_metadata = metadata
+ let maybe_schema = metadata
.remove(super::ARROW_SCHEMA_META_KEY)
- .map(|encoded| get_arrow_schema_from_metadata(&encoded))
- .map_or(Ok(None), |v| v.map(Some))?;
+ .map(|value| get_arrow_schema_from_metadata(&value))
+ .transpose()?;
- // add the Arrow metadata to the Parquet metadata
- if let Some(arrow_schema) = &arrow_schema_metadata {
+ // Add the Arrow metadata to the Parquet metadata skipping keys that collide
+ if let Some(arrow_schema) = &maybe_schema {
arrow_schema.metadata().iter().for_each(|(k, v)| {
- metadata.insert(k.clone(), v.clone());
+ metadata.entry(k.clone()).or_insert(v.clone());
});
}
- let mut base_nodes = Vec::new();
- let mut base_nodes_set = HashSet::new();
- let mut leaves = HashSet::new();
-
- enum FieldType<'a> {
- Parquet(&'a Type),
- Arrow(Field),
- }
-
- for c in column_indices {
- let column = parquet_schema.column(c);
- let name = column.name();
-
- if let Some(field) = arrow_schema_metadata
- .as_ref()
- .and_then(|schema| schema.field_with_name(name).ok().cloned())
- {
- base_nodes.push(FieldType::Arrow(field));
- } else {
- let column = column.self_type() as *const Type;
- let root = parquet_schema.get_column_root(c);
- let root_raw_ptr = root as *const Type;
-
- leaves.insert(column);
- if !base_nodes_set.contains(&root_raw_ptr) {
- base_nodes.push(FieldType::Parquet(root));
- base_nodes_set.insert(root_raw_ptr);
- }
- }
+ match convert_schema(parquet_schema, column_indices, maybe_schema.as_ref())? {
+ Some(field) => match field.arrow_type {
+ DataType::Struct(fields) => Ok(Schema::new_with_metadata(fields, metadata)),
+ _ => unreachable!(),
+ },
+ None => Ok(Schema::new_with_metadata(vec![], metadata)),
}
-
- base_nodes
- .into_iter()
- .map(|t| match t {
- FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(),
- FieldType::Arrow(f) => Ok(Some(f)),
- })
- .collect::<Result<Vec<Option<Field>>>>()
- .map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
- .map(|fields| Schema::new_with_metadata(fields, metadata))
}
/// Try to convert Arrow schema metadata into a schema
@@ -299,14 +264,13 @@ fn parse_key_value_metadata(
/// Convert parquet column schema to arrow field.
pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
- let schema = parquet_column.self_type();
-
- let mut leaves = HashSet::new();
- leaves.insert(parquet_column.self_type() as *const Type);
+ let field = complex::convert_type(&parquet_column.self_type_ptr())?;
- ParquetTypeConverter::new(schema, &leaves)
- .to_field()
- .map(|opt| opt.unwrap())
+ Ok(Field::new(
+ parquet_column.name(),
+ field.arrow_type,
+ field.nullable,
+ ))
}
pub fn decimal_length_from_precision(precision: usize) -> usize {
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
.with_repetition(repetition)
.build(),
- DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
- name,
- PhysicalType::INT64,
- )
- .with_logical_type(Some(LogicalType::Timestamp {
- is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
- unit: match time_unit {
- TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
- TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
- TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
- TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
- },
- }))
- .with_repetition(repetition)
- .build(),
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ // Cannot represent seconds in LogicalType
+ Type::primitive_type_builder(name, PhysicalType::INT64)
+ .with_repetition(repetition)
+ .build()
+ }
+ DataType::Timestamp(time_unit, _) => {
+ Type::primitive_type_builder(name, PhysicalType::INT64)
+ .with_logical_type(Some(LogicalType::Timestamp {
+ is_adjusted_to_u_t_c: false,
+ unit: match time_unit {
+ TimeUnit::Second => unreachable!(),
+ TimeUnit::Millisecond => {
+ ParquetTimeUnit::MILLIS(Default::default())
+ }
+ TimeUnit::Microsecond => {
+ ParquetTimeUnit::MICROS(Default::default())
+ }
+ TimeUnit::Nanosecond => {
+ ParquetTimeUnit::NANOS(Default::default())
+ }
+ },
+ }))
+ .with_repetition(repetition)
+ .build()
+ }
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
- // date64 is cast to date32
+ // date64 is cast to date32 (#1666)
DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
- DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
+ DataType::Time32(TimeUnit::Second) => {
+ // Cannot represent seconds in LogicalType
+ Type::primitive_type_builder(name, PhysicalType::INT32)
+ .with_repetition(repetition)
+ .build()
+ }
+ DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
- unit: ParquetTimeUnit::MILLIS(Default::default()),
+ unit: match unit {
+ TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
+ u => unreachable!("Invalid unit for Time32: {:?}", u),
+ },
}))
.with_repetition(repetition)
.build(),
@@ -544,502 +528,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
}
}
}
-/// This struct is used to group methods and data structures used to convert parquet
-/// schema together.
-struct ParquetTypeConverter<'a> {
- schema: &'a Type,
- /// This is the columns that need to be converted to arrow schema.
- columns_to_convert: &'a HashSet<*const Type>,
-}
-
-impl<'a> ParquetTypeConverter<'a> {
- fn new(schema: &'a Type, columns_to_convert: &'a HashSet<*const Type>) -> Self {
- Self {
- schema,
- columns_to_convert,
- }
- }
-
- fn clone_with_schema(&self, other: &'a Type) -> Self {
- Self {
- schema: other,
- columns_to_convert: self.columns_to_convert,
- }
- }
-}
-
-impl ParquetTypeConverter<'_> {
- // Public interfaces.
-
- /// Converts parquet schema to arrow data type.
- ///
- /// This function discards schema name.
- ///
- /// If this schema is a primitive type and not included in the leaves, the result is
- /// Ok(None).
- ///
- /// If this schema is a group type and none of its children is reserved in the
- /// conversion, the result is Ok(None).
- fn to_data_type(&self) -> Result<Option<DataType>> {
- match self.schema {
- Type::PrimitiveType { .. } => self.to_primitive_type(),
- Type::GroupType { .. } => self.to_group_type(),
- }
- }
-
- /// Converts parquet schema to arrow field.
- ///
- /// This method is roughly the same as
- /// [`to_data_type`](`ParquetTypeConverter::to_data_type`), except it reserves schema
- /// name.
- fn to_field(&self) -> Result<Option<Field>> {
- self.to_data_type().map(|opt| {
- opt.map(|dt| Field::new(self.schema.name(), dt, self.is_nullable()))
- })
- }
-
- // Utility functions.
-
- /// Checks whether this schema is nullable.
- fn is_nullable(&self) -> bool {
- let basic_info = self.schema.get_basic_info();
- if basic_info.has_repetition() {
- match basic_info.repetition() {
- Repetition::OPTIONAL => true,
- Repetition::REPEATED => true,
- Repetition::REQUIRED => false,
- }
- } else {
- false
- }
- }
-
- fn is_repeated(&self) -> bool {
- let basic_info = self.schema.get_basic_info();
-
- basic_info.has_repetition() && basic_info.repetition() == Repetition::REPEATED
- }
-
- fn is_self_included(&self) -> bool {
- self.columns_to_convert
- .contains(&(self.schema as *const Type))
- }
-
- // Functions for primitive types.
-
- /// Entry point for converting parquet primitive type to arrow type.
- ///
- /// This function takes care of repetition.
- fn to_primitive_type(&self) -> Result<Option<DataType>> {
- if self.is_self_included() {
- self.to_primitive_type_inner().map(|dt| {
- if self.is_repeated() {
- Some(DataType::List(Box::new(Field::new(
- self.schema.name(),
- dt,
- self.is_nullable(),
- ))))
- } else {
- Some(dt)
- }
- })
- } else {
- Ok(None)
- }
- }
-
- /// Converting parquet primitive type to arrow data type.
- fn to_primitive_type_inner(&self) -> Result<DataType> {
- match self.schema.get_physical_type() {
- PhysicalType::BOOLEAN => Ok(DataType::Boolean),
- PhysicalType::INT32 => self.from_int32(),
- PhysicalType::INT64 => self.from_int64(),
- PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
- PhysicalType::FLOAT => Ok(DataType::Float32),
- PhysicalType::DOUBLE => Ok(DataType::Float64),
- PhysicalType::BYTE_ARRAY => self.from_byte_array(),
- PhysicalType::FIXED_LEN_BYTE_ARRAY => self.from_fixed_len_byte_array(),
- }
- }
-
- #[allow(clippy::wrong_self_convention)]
- fn from_int32(&self) -> Result<DataType> {
- match (
- self.schema.get_basic_info().logical_type(),
- self.schema.get_basic_info().converted_type(),
- ) {
- (None, ConvertedType::NONE) => Ok(DataType::Int32),
- (Some(ref t @ LogicalType::Integer {
- bit_width,
- is_signed,
- }), _) => match (bit_width, is_signed) {
- (8, true) => Ok(DataType::Int8),
- (16, true) => Ok(DataType::Int16),
- (32, true) => Ok(DataType::Int32),
- (8, false) => Ok(DataType::UInt8),
- (16, false) => Ok(DataType::UInt16),
- (32, false) => Ok(DataType::UInt32),
- _ => Err(ArrowError(format!(
- "Cannot create INT32 physical type from {:?}",
- t,
- ))),
- },
- (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
- (Some(LogicalType::Date), _) => Ok(DataType::Date32),
- (Some(LogicalType::Time { unit, .. }), _) => match unit {
- ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
- _ => Err(ArrowError(format!(
- "Cannot create INT32 physical type from {:?}",
- unit
- ))),
- },
- // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
- (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
- (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
- (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
- (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
- (None, ConvertedType::INT_8) => Ok(DataType::Int8),
- (None, ConvertedType::INT_16) => Ok(DataType::Int16),
- (None, ConvertedType::INT_32) => Ok(DataType::Int32),
- (None, ConvertedType::DATE) => Ok(DataType::Date32),
- (None, ConvertedType::TIME_MILLIS) => {
- Ok(DataType::Time32(TimeUnit::Millisecond))
- }
- (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
- (logical, converted) => Err(ArrowError(format!(
- "Unable to convert parquet INT32 logical type {:?} or converted type {}",
- logical, converted
- ))),
- }
- }
-
- #[allow(clippy::wrong_self_convention)]
- fn from_int64(&self) -> Result<DataType> {
- match (
- self.schema.get_basic_info().logical_type(),
- self.schema.get_basic_info().converted_type(),
- ) {
- (None, ConvertedType::NONE) => Ok(DataType::Int64),
- (Some(LogicalType::Integer { bit_width, is_signed }), _) if bit_width == 64 => {
- match is_signed {
- true => Ok(DataType::Int64),
- false => Ok(DataType::UInt64),
- }
- }
- (Some(LogicalType::Time { unit, .. }), _) => match unit {
- ParquetTimeUnit::MILLIS(_) => Err(ArrowError(
- "Cannot create INT64 from MILLIS time unit".to_string(),
- )),
- ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
- ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
- },
- (Some(LogicalType::Timestamp { is_adjusted_to_u_t_c, unit }), _) => Ok(DataType::Timestamp(
- match unit {
- ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
- ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
- ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
- },
- if is_adjusted_to_u_t_c {
- Some("UTC".to_string())
- } else {
- None
- },
- )),
- (None, ConvertedType::INT_64) => Ok(DataType::Int64),
- (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
- (None, ConvertedType::TIME_MICROS) => {
- Ok(DataType::Time64(TimeUnit::Microsecond))
- }
- (None, ConvertedType::TIMESTAMP_MILLIS) => {
- Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
- }
- (None, ConvertedType::TIMESTAMP_MICROS) => {
- Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
- }
- (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
- (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
- (logical, converted) => Err(ArrowError(format!(
- "Unable to convert parquet INT64 logical type {:?} or converted type {}",
- logical, converted
- ))),
- }
- }
-
- #[allow(clippy::wrong_self_convention)]
- fn from_fixed_len_byte_array(&self) -> Result<DataType> {
- match (
- self.schema.get_basic_info().logical_type(),
- self.schema.get_basic_info().converted_type(),
- ) {
- (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
- (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
- (None, ConvertedType::INTERVAL) => {
- // There is currently no reliable way of determining which IntervalUnit
- // to return. Thus without the original Arrow schema, the results
- // would be incorrect if all 12 bytes of the interval are populated
- Ok(DataType::Interval(IntervalUnit::DayTime))
- }
- _ => {
- let byte_width = match self.schema {
- Type::PrimitiveType {
- ref type_length, ..
- } => *type_length,
- _ => {
- return Err(ArrowError(
- "Expected a physical type, not a group type".to_string(),
- ))
- }
- };
-
- Ok(DataType::FixedSizeBinary(byte_width))
- }
- }
- }
-
- fn to_decimal(&self) -> DataType {
- assert!(self.schema.is_primitive());
- DataType::Decimal(
- self.schema.get_precision() as usize,
- self.schema.get_scale() as usize,
- )
- }
-
- #[allow(clippy::wrong_self_convention)]
- fn from_byte_array(&self) -> Result<DataType> {
- match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) {
- (Some(LogicalType::String), _) => Ok(DataType::Utf8),
- (Some(LogicalType::Json), _) => Ok(DataType::Binary),
- (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
- (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
- (None, ConvertedType::NONE) => Ok(DataType::Binary),
- (None, ConvertedType::JSON) => Ok(DataType::Binary),
- (None, ConvertedType::BSON) => Ok(DataType::Binary),
- (None, ConvertedType::ENUM) => Ok(DataType::Binary),
- (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
- (logical, converted) => Err(ArrowError(format!(
- "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
- logical, converted
- ))),
- }
- }
-
- // Functions for group types.
-
- /// Entry point for converting parquet group type.
- ///
- /// This function takes care of logical type and repetition.
- fn to_group_type(&self) -> Result<Option<DataType>> {
- match (
- self.schema.get_basic_info().logical_type(),
- self.schema.get_basic_info().converted_type(),
- ) {
- (Some(LogicalType::List), _) | (_, ConvertedType::LIST) => self.to_list(),
- (Some(LogicalType::Map), _)
- | (_, ConvertedType::MAP)
- | (_, ConvertedType::MAP_KEY_VALUE) => self.to_map(),
- (_, _) => {
- if self.is_repeated() {
- self.to_struct().map(|opt| {
- opt.map(|dt| {
- DataType::List(Box::new(Field::new(
- self.schema.name(),
- dt,
- self.is_nullable(),
- )))
- })
- })
- } else {
- self.to_struct()
- }
- }
- }
- }
-
- /// Converts a parquet group type to arrow struct.
- fn to_struct(&self) -> Result<Option<DataType>> {
- match self.schema {
- Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
- "{:?} is a struct type, and can't be processed as primitive.",
- self.schema
- ))),
- Type::GroupType {
- basic_info: _,
- fields,
- } => fields
- .iter()
- .map(|field_ptr| self.clone_with_schema(field_ptr).to_field())
- .collect::<Result<Vec<Option<Field>>>>()
- .map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
- .map(|fields| {
- if fields.is_empty() {
- None
- } else {
- Some(DataType::Struct(fields))
- }
- }),
- }
- }
-
- /// Converts a parquet list to arrow list.
- ///
- /// To fully understand this algorithm, please refer to
- /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
- fn to_list(&self) -> Result<Option<DataType>> {
- match self.schema {
- Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
- "{:?} is a list type and can't be processed as primitive.",
- self.schema
- ))),
- Type::GroupType {
- basic_info: _,
- fields,
- } if fields.len() == 1 => {
- let list_item = fields.first().unwrap();
- let item_converter = self.clone_with_schema(list_item);
-
- let item_type = match list_item.as_ref() {
- Type::PrimitiveType { .. } => {
- if item_converter.is_repeated() {
- item_converter.to_primitive_type_inner().map(Some)
- } else {
- Err(ArrowError(
- "Primitive element type of list must be repeated."
- .to_string(),
- ))
- }
- }
- Type::GroupType {
- basic_info: _,
- fields,
- } => {
- if fields.len() > 1 {
- item_converter.to_struct()
- } else if fields.len() == 1
- && list_item.name() != "array"
- && list_item.name() != format!("{}_tuple", self.schema.name())
- {
- let nested_item = fields.first().unwrap();
- let nested_item_converter =
- self.clone_with_schema(nested_item);
-
- nested_item_converter.to_data_type()
- } else {
- item_converter.to_struct()
- }
- }
- };
-
- // Check that the name of the list child is "list", in which case we
- // get the child nullability and name (normally "element") from the nested
- // group type.
- // Without this step, the child incorrectly inherits the parent's optionality
- let (list_item_name, item_is_optional) = match &item_converter.schema {
- Type::GroupType { basic_info, fields }
- if basic_info.name() == "list" && fields.len() == 1 =>
- {
- let field = fields.first().unwrap();
- (field.name(), field.is_optional())
- }
- _ => (list_item.name(), list_item.is_optional()),
- };
-
- item_type.map(|opt| {
- opt.map(|dt| {
- DataType::List(Box::new(Field::new(
- list_item_name,
- dt,
- item_is_optional,
- )))
- })
- })
- }
- _ => Err(ArrowError(
- "Group element type of list can only contain one field.".to_string(),
- )),
- }
- }
-
- /// Converts a parquet map to arrow map.
- ///
- /// To fully understand this algorithm, please refer to
- /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
- fn to_map(&self) -> Result<Option<DataType>> {
- match self.schema {
- Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
- "{:?} is a map type and can't be processed as primitive.",
- self.schema
- ))),
- Type::GroupType {
- basic_info: _,
- fields,
- } if fields.len() == 1 => {
- let key_item = fields.first().unwrap();
-
- let (key_type, value_type) = match key_item.as_ref() {
- Type::PrimitiveType { .. } => {
- return Err(ArrowError(
- "A map can only have a group child type (key_values)."
- .to_string(),
- ))
- }
- Type::GroupType {
- basic_info: _,
- fields,
- } => {
- if fields.len() != 2 {
- return Err(ArrowError(format!("Map type should have 2 fields, a key and value. Found {} fields", fields.len())));
- } else {
- let nested_key = fields.first().unwrap();
- let nested_key_converter = self.clone_with_schema(nested_key);
-
- let nested_value = fields.last().unwrap();
- let nested_value_converter =
- self.clone_with_schema(nested_value);
-
- (
- nested_key_converter.to_data_type()?.map(|d| {
- Field::new(
- nested_key.name(),
- d,
- nested_key.is_optional(),
- )
- }),
- nested_value_converter.to_data_type()?.map(|d| {
- Field::new(
- nested_value.name(),
- d,
- nested_value.is_optional(),
- )
- }),
- )
- }
- }
- };
-
- match (key_type, value_type) {
- (Some(key), Some(value)) => Ok(Some(DataType::Map(
- Box::new(Field::new(
- key_item.name(),
- DataType::Struct(vec![key, value]),
- self.schema.is_optional(),
- )),
- false, // There is no information to tell if keys are sorted
- ))),
- (None, None) => Ok(None),
- (None, Some(_)) => Err(ArrowError(
- "Could not convert the map key to a valid datatype".to_string(),
- )),
- (Some(_), None) => Err(ArrowError(
- "Could not convert the map value to a valid datatype".to_string(),
- )),
- }
- }
- _ => Err(ArrowError(
- "Group element type of map can only contain one field.".to_string(),
- )),
- }
- }
-}
#[cfg(test)]
mod tests {
@@ -1261,7 +749,7 @@ mod tests {
{
arrow_fields.push(Field::new(
"my_list",
- DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+ DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),
true,
));
}
@@ -1273,7 +761,7 @@ mod tests {
{
arrow_fields.push(Field::new(
"my_list",
- DataType::List(Box::new(Field::new("element", DataType::Int32, true))),
+ DataType::List(Box::new(Field::new("element", DataType::Int32, false))),
true,
));
}
@@ -1292,7 +780,7 @@ mod tests {
]);
arrow_fields.push(Field::new(
"my_list",
- DataType::List(Box::new(Field::new("element", arrow_struct, true))),
+ DataType::List(Box::new(Field::new("element", arrow_struct, false))),
true,
));
}
@@ -1309,7 +797,7 @@ mod tests {
DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list",
- DataType::List(Box::new(Field::new("array", arrow_struct, true))),
+ DataType::List(Box::new(Field::new("array", arrow_struct, false))),
true,
));
}
@@ -1326,7 +814,11 @@ mod tests {
DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list",
- DataType::List(Box::new(Field::new("my_list_tuple", arrow_struct, true))),
+ DataType::List(Box::new(Field::new(
+ "my_list_tuple",
+ arrow_struct,
+ false,
+ ))),
true,
));
}
@@ -1336,8 +828,8 @@ mod tests {
{
arrow_fields.push(Field::new(
"name",
- DataType::List(Box::new(Field::new("name", DataType::Int32, true))),
- true,
+ DataType::List(Box::new(Field::new("name", DataType::Int32, false))),
+ false,
));
}
@@ -1350,7 +842,7 @@ mod tests {
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
- assert_eq!(arrow_fields[i], converted_fields[i]);
+ assert_eq!(arrow_fields[i], converted_fields[i], "{}", i);
}
}
@@ -1503,7 +995,7 @@ mod tests {
Field::new("str", DataType::Utf8, false),
Field::new("num", DataType::Int32, false),
]),
- true,
+ false, // (#1697)
)),
false,
),
@@ -1528,7 +1020,7 @@ mod tests {
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
]),
- true,
+ false, // (#1697)
)),
false,
),
@@ -1636,21 +1128,39 @@ mod tests {
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
+
+ let err =
+ parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 2, 4], None)
+ .unwrap_err()
+ .to_string();
+
+ assert!(
+ err.contains("out of order projection is not supported"),
+ "{}",
+ err
+ );
+
+ let err =
+ parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 3, 4], None)
+ .unwrap_err()
+ .to_string();
+
+ assert!(err.contains("repeated column projection is not supported, column 3 appeared multiple times"), "{}", err);
}
#[test]
fn test_nested_schema_partial_ordering() {
let mut arrow_fields = Vec::new();
{
+ let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
+ let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
+ arrow_fields.push(group1);
+
let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)];
let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
arrow_fields.push(group2);
arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
-
- let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
- let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
- arrow_fields.push(group1);
}
let message_type = "
@@ -1679,7 +1189,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
+ parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)
.unwrap();
let converted_fields = converted_arrow_schema.fields();
@@ -1700,9 +1210,9 @@ mod tests {
DataType::List(Box::new(Field::new(
"innerGroup",
DataType::Struct(vec![Field::new("leaf3", DataType::Int32, true)]),
- true,
+ false,
))),
- true,
+ false,
);
let outer_group_list = Field::new(
@@ -1713,9 +1223,9 @@ mod tests {
Field::new("leaf2", DataType::Int32, true),
inner_group_list,
]),
- true,
+ false,
))),
- true,
+ false,
);
arrow_fields.push(outer_group_list);
}
@@ -1790,8 +1300,8 @@ mod tests {
Field::new("string", DataType::Utf8, true),
Field::new(
"bools",
- DataType::List(Box::new(Field::new("bools", DataType::Boolean, true))),
- true,
+ DataType::List(Box::new(Field::new("bools", DataType::Boolean, false))),
+ false,
),
Field::new("date", DataType::Date32, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
@@ -2099,7 +1609,7 @@ mod tests {
true,
),
]),
- true,
+ false, // #1697
)),
false, // fails to roundtrip keys_sorted
),
@@ -2122,7 +1632,7 @@ mod tests {
true,
),
]),
- true,
+ false, // #1697
)),
false, // fails to roundtrip keys_sorted
),
@@ -2179,7 +1689,6 @@ mod tests {
}
#[test]
- #[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"]
fn test_arrow_schema_roundtrip_lists() -> Result<()> {
let metadata: HashMap<String, String> =
[("Key".to_string(), "Value".to_string())]
diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs
new file mode 100644
index 000000000..31a9d6e82
--- /dev/null
+++ b/parquet/src/arrow/schema/complex.rs
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+ let info = t.get_basic_info();
+ match info.has_repetition() {
+ true => info.repetition(),
+ false => Repetition::REQUIRED,
+ }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+ /// The level which represents an insertion into the current list
+ /// i.e. guaranteed to be > 0 for a list type
+ pub rep_level: i16,
+ /// The level at which this field is fully defined,
+ /// i.e. guaranteed to be > 0 for a nullable type
+ pub def_level: i16,
+ /// Whether this field is nullable
+ pub nullable: bool,
+ /// The arrow type of the column data
+ ///
+ /// Note: In certain cases the data stored in parquet may have been coerced
+ /// to a different type and will require conversion on read (e.g. Date64 and Interval)
+ pub arrow_type: DataType,
+ /// The type of this field
+ pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+ /// Converts `self` into an arrow list, with its current type as the field type
+ ///
+ /// This is used to convert repeated columns, into their arrow representation
+ fn into_list(self, name: &str) -> Self {
+ ParquetField {
+ rep_level: self.rep_level,
+ def_level: self.def_level,
+ nullable: false,
+ arrow_type: DataType::List(Box::new(Field::new(
+ name,
+ self.arrow_type.clone(),
+ false,
+ ))),
+ field_type: ParquetFieldType::Group {
+ children: vec![self],
+ },
+ }
+ }
+
+ /// Returns a list of [`ParquetField`] children if this is a group type
+ pub fn children(&self) -> Option<&[Self]> {
+ match &self.field_type {
+ ParquetFieldType::Primitive { .. } => None,
+ ParquetFieldType::Group { children } => Some(children),
+ }
+ }
+}
+
+pub enum ParquetFieldType {
+ Primitive {
+ /// The index of the column in parquet
+ col_idx: usize,
+ /// The type of the column in parquet
+ primitive_type: TypePtr,
+ },
+ Group {
+ children: Vec<ParquetField>,
+ },
+}
+
+/// Encodes the context of the parent of the field currently under consideration
+struct VisitorContext {
+ rep_level: i16,
+ def_level: i16,
+ /// An optional [`DataType`] sourced from the embedded arrow schema
+ data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+ /// Compute the resulting definition level, repetition level and nullability
+ /// for a child field with the given [`Repetition`]
+ fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+ match repetition {
+ Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+ Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+ Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+ }
+ }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+ /// The column index of the next leaf column
+ next_col_idx: usize,
+
+ /// Mask of columns to include
+ column_mask: Vec<bool>,
+}
+
+impl Visitor {
+ fn visit_primitive(
+ &mut self,
+ primitive_type: &TypePtr,
+ context: VisitorContext,
+ ) -> Result<Option<ParquetField>> {
+ let col_idx = self.next_col_idx;
+ self.next_col_idx += 1;
+
+ if !self.column_mask[col_idx] {
+ return Ok(None);
+ }
+
+ let repetition = get_repetition(primitive_type);
+ let (def_level, rep_level, nullable) = context.levels(repetition);
+
+ let arrow_type = convert_primitive(primitive_type, context.data_type)?;
+
+ let primitive_field = ParquetField {
+ rep_level,
+ def_level,
+ nullable,
+ arrow_type,
+ field_type: ParquetFieldType::Primitive {
+ primitive_type: primitive_type.clone(),
+ col_idx,
+ },
+ };
+
+ Ok(Some(match repetition {
+ Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
+ _ => primitive_field,
+ }))
+ }
+
+ fn visit_struct(
+ &mut self,
+ struct_type: &TypePtr,
+ context: VisitorContext,
+ ) -> Result<Option<ParquetField>> {
+ // The root type will not have a repetition level
+ let repetition = get_repetition(struct_type);
+ let (def_level, rep_level, nullable) = context.levels(repetition);
+
+ let parquet_fields = struct_type.get_fields();
+
+ // Extract the arrow fields
+ let arrow_fields = match &context.data_type {
+ Some(DataType::Struct(fields)) => {
+ if fields.len() != parquet_fields.len() {
+ return Err(arrow_err!(
+ "incompatible arrow schema, expected {} struct fields got {}",
+ parquet_fields.len(),
+ fields.len()
+ ));
+ }
+ Some(fields)
+ }
+ Some(d) => {
+ return Err(arrow_err!(
+ "incompatible arrow schema, expected struct got {}",
+ d
+ ))
+ }
+ None => None,
+ };
+
+ let mut child_fields = Vec::with_capacity(parquet_fields.len());
+ let mut children = Vec::with_capacity(parquet_fields.len());
+
+ // Perform a DFS of children
+ for (idx, parquet_field) in parquet_fields.iter().enumerate() {
+ let data_type = match arrow_fields {
+ Some(fields) => {
+ let field = &fields[idx];
+ if field.name() != parquet_field.name() {
+ return Err(arrow_err!(
+ "incompatible arrow schema, expected field named {} got {}",
+ parquet_field.name(),
+ field.name()
+ ));
+ }
+ Some(field.data_type().clone())
+ }
+ None => None,
+ };
+
+ let arrow_field = arrow_fields.map(|x| &x[idx]);
+ let child_ctx = VisitorContext {
+ rep_level,
+ def_level,
+ data_type,
+ };
+
+ if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
+ // The child type returned may be different from what is encoded in the arrow
+ // schema in the event of a mismatch or a projection
+ child_fields.push(convert_field(parquet_field, &child, arrow_field));
+ children.push(child);
+ }
+ }
+
+ if children.is_empty() {
+ return Ok(None);
+ }
+
+ let struct_field = ParquetField {
+ rep_level,
+ def_level,
+ nullable,
+ arrow_type: DataType::Struct(child_fields),
+ field_type: ParquetFieldType::Group { children },
+ };
+
+ Ok(Some(match repetition {
+ Repetition::REPEATED => struct_field.into_list(struct_type.name()),
+ _ => struct_field,
+ }))
+ }
+
+ fn visit_map(
+ &mut self,
+ map_type: &TypePtr,
+ context: VisitorContext,
+ ) -> Result<Option<ParquetField>> {
+ let rep_level = context.rep_level + 1;
+ let (def_level, nullable) = match get_repetition(map_type) {
+ Repetition::REQUIRED => (context.def_level + 1, false),
+ Repetition::OPTIONAL => (context.def_level + 2, true),
+ Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
+ };
+
+ if map_type.get_fields().len() != 1 {
+ return Err(arrow_err!(
+ "Map field must have exactly one key_value child, found {}",
+ map_type.get_fields().len()
+ ));
+ }
+
+ // Add map entry (key_value) to context
+ let map_key_value = &map_type.get_fields()[0];
+ if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
+ return Err(arrow_err!("Child of map field must be repeated"));
+ }
+
+ if map_key_value.get_fields().len() != 2 {
+ // According to the specification the values are optional (#1642)
+ return Err(arrow_err!(
+ "Child of map field must have two children, found {}",
+ map_key_value.get_fields().len()
+ ));
+ }
+
+ // Get key and value, and create context for each
+ let map_key = &map_key_value.get_fields()[0];
+ let map_value = &map_key_value.get_fields()[1];
+
+ if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
+ return Err(arrow_err!("Map keys must be required"));
+ }
+
+ if map_value.get_basic_info().repetition() == Repetition::REPEATED {
+ return Err(arrow_err!("Map values cannot be repeated"));
+ }
+
+ // Extract the arrow fields
+ let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
+ Some(DataType::Map(field, sorted)) => match field.data_type() {
+ DataType::Struct(fields) => {
+ if fields.len() != 2 {
+ return Err(arrow_err!(
+ "Map data type should contain struct with two children, got {}",
+ fields.len()
+ ));
+ }
+
+ (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted)
+ }
+ d => {
+ return Err(arrow_err!(
+ "Map data type should contain struct got {}",
+ d
+ ));
+ }
+ },
+ Some(d) => {
+ return Err(arrow_err!(
+ "incompatible arrow schema, expected map got {}",
+ d
+ ))
+ }
+ None => (None, None, None, false),
+ };
+
+ let maybe_key = {
+ let context = VisitorContext {
+ rep_level,
+ def_level,
+ data_type: arrow_key.map(|x| x.data_type().clone()),
+ };
+
+ self.dispatch(map_key, context)?
+ };
+
+ let maybe_value = {
+ let context = VisitorContext {
+ rep_level,
+ def_level,
+ data_type: arrow_value.map(|x| x.data_type().clone()),
+ };
+
+ self.dispatch(map_value, context)?
+ };
+
+ // Need both columns to be projected
+ match (maybe_key, maybe_value) {
+ (Some(key), Some(value)) => {
+ let key_field = convert_field(map_key, &key, arrow_key);
+ let value_field = convert_field(map_value, &value, arrow_value);
+
+ let map_field = Field::new(
+ map_key_value.name(),
+ DataType::Struct(vec![key_field, value_field]),
+ false, // The inner map field is always non-nullable (#1697)
+ )
+ .with_metadata(arrow_map.and_then(|f| f.metadata().cloned()));
+
+ Ok(Some(ParquetField {
+ rep_level,
+ def_level,
+ nullable,
+ arrow_type: DataType::Map(Box::new(map_field), sorted),
+ field_type: ParquetFieldType::Group {
+ children: vec![key, value],
+ },
+ }))
+ }
+ _ => Ok(None),
+ }
+ }
+
+ fn visit_list(
+ &mut self,
+ list_type: &TypePtr,
+ context: VisitorContext,
+ ) -> Result<Option<ParquetField>> {
+ if list_type.is_primitive() {
+ return Err(arrow_err!(
+ "{:?} is a list type and can't be processed as primitive.",
+ list_type
+ ));
+ }
+
+ let fields = list_type.get_fields();
+ if fields.len() != 1 {
+ return Err(arrow_err!(
+ "list type must have a single child, found {}",
+ fields.len()
+ ));
+ }
+
+ let repeated_field = &fields[0];
+ if get_repetition(repeated_field) != Repetition::REPEATED {
+ return Err(arrow_err!("List child must be repeated"));
+ }
+
+ // If the list is nullable
+ let (def_level, nullable) = match list_type.get_basic_info().repetition() {
+ Repetition::REQUIRED => (context.def_level, false),
+ Repetition::OPTIONAL => (context.def_level + 1, true),
+ Repetition::REPEATED => {
+ return Err(arrow_err!("List type cannot be repeated"))
+ }
+ };
+
+ let arrow_field = match &context.data_type {
+ Some(DataType::List(f)) => Some(f.as_ref()),
+ Some(DataType::LargeList(f)) => Some(f.as_ref()),
+ Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
+ Some(d) => {
+ return Err(arrow_err!(
+ "incompatible arrow schema, expected list got {}",
+ d
+ ))
+ }
+ None => None,
+ };
+
+ if repeated_field.is_primitive() {
+ // If the repeated field is not a group, then its type is the element type and elements are required.
+ //
+ // required/optional group my_list (LIST) {
+ // repeated int32 element;
+ // }
+ //
+ let context = VisitorContext {
+ rep_level: context.rep_level,
+ def_level,
+ data_type: arrow_field.map(|f| f.data_type().clone()),
+ };
+
+ return match self.visit_primitive(repeated_field, context) {
+ Ok(Some(mut field)) => {
+ // visit_primitive will infer a non-nullable list, update if necessary
+ field.nullable = nullable;
+ Ok(Some(field))
+ }
+ r => r,
+ };
+ }
+
+ let items = repeated_field.get_fields();
+ if items.len() != 1
+ || repeated_field.name() == "array"
+ || repeated_field.name() == format!("{}_tuple", list_type.name())
+ {
+ // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
+ //
+ // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
+ // with _tuple appended then the repeated type is the element type and elements are required.
+ let context = VisitorContext {
+ rep_level: context.rep_level,
+ def_level,
+ data_type: arrow_field.map(|f| f.data_type().clone()),
+ };
+
+ return match self.visit_struct(repeated_field, context) {
+ Ok(Some(mut field)) => {
+ field.nullable = nullable;
+ Ok(Some(field))
+ }
+ r => r,
+ };
+ }
+
+ // Regular list handling logic
+ let item_type = &items[0];
+ let rep_level = context.rep_level + 1;
+ let def_level = def_level + 1;
+
+ let new_context = VisitorContext {
+ def_level,
+ rep_level,
+ data_type: arrow_field.map(|f| f.data_type().clone()),
+ };
+
+ match self.dispatch(item_type, new_context) {
+ Ok(Some(item)) => {
+ let item_field = Box::new(convert_field(item_type, &item, arrow_field));
+
+ // Use arrow type as hint for index size
+ let arrow_type = match context.data_type {
+ Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
+ Some(DataType::FixedSizeList(_, len)) => {
+ DataType::FixedSizeList(item_field, len)
+ }
+ _ => DataType::List(item_field),
+ };
+
+ Ok(Some(ParquetField {
+ rep_level,
+ def_level,
+ nullable,
+ arrow_type,
+ field_type: ParquetFieldType::Group {
+ children: vec![item],
+ },
+ }))
+ }
+ r => r,
+ }
+ }
+
+ fn dispatch(
+ &mut self,
+ cur_type: &TypePtr,
+ context: VisitorContext,
+ ) -> Result<Option<ParquetField>> {
+ if cur_type.is_primitive() {
+ self.visit_primitive(cur_type, context)
+ } else {
+ match cur_type.get_basic_info().converted_type() {
+ ConvertedType::LIST => self.visit_list(cur_type, context),
+ ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
+ self.visit_map(cur_type, context)
+ }
+ _ => self.visit_struct(cur_type, context),
+ }
+ }
+ }
+}
+
+/// Computes the [`Field`] for a child column
+///
+/// The resulting [`Field`] will have the type dictated by `field`, a name
+/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
+fn convert_field(
+ parquet_type: &Type,
+ field: &ParquetField,
+ arrow_hint: Option<&Field>,
+) -> Field {
+ let name = parquet_type.name();
+ let data_type = field.arrow_type.clone();
+ let nullable = field.nullable;
+
+ match arrow_hint {
+ Some(hint) => {
+ // If the inferred type is a dictionary, preserve dictionary metadata
+ let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
+ (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
+ Field::new_dict(name, data_type, nullable, id, ordered)
+ }
+ _ => Field::new(name, data_type, nullable),
+ };
+
+ field.with_metadata(hint.metadata().cloned())
+ }
+ None => Field::new(name, data_type, nullable),
+ }
+}
+
+/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
+/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
+/// [`Schema`] embedded in the parquet metadata
+///
+/// Note: This does not support out of order column projection
+pub fn convert_schema<T: IntoIterator<Item = usize>>(
+ schema: &SchemaDescriptor,
+ leaf_columns: T,
+ embedded_arrow_schema: Option<&Schema>,
+) -> Result<Option<ParquetField>> {
+ let mut leaf_mask = vec![false; schema.num_columns()];
+ let mut last_idx = 0;
+ for i in leaf_columns {
+ if i < last_idx {
+ return Err(general_err!("out of order projection is not supported"));
+ }
+ if leaf_mask[i] {
+ return Err(general_err!("repeated column projection is not supported, column {} appeared multiple times", i));
+ }
+
+ last_idx = i;
+ leaf_mask[i] = true;
+ }
+
+ let mut visitor = Visitor {
+ next_col_idx: 0,
+ column_mask: leaf_mask,
+ };
+
+ let context = VisitorContext {
+ rep_level: 0,
+ def_level: 0,
+ data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
+ };
+
+ visitor.dispatch(&schema.root_schema_ptr(), context)
+}
+
+/// Computes the [`ParquetField`] for the provided `parquet_type`
+pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
+ let mut visitor = Visitor {
+ next_col_idx: 0,
+ column_mask: vec![true],
+ };
+
+ let context = VisitorContext {
+ rep_level: 0,
+ def_level: 0,
+ data_type: None,
+ };
+
+ Ok(visitor.dispatch(parquet_type, context)?.unwrap())
+}
diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs
new file mode 100644
index 000000000..0816b6b2f
--- /dev/null
+++ b/parquet/src/arrow/schema/primitive.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+ ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+ parquet_type: &Type,
+ arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+ let physical_type = from_parquet(parquet_type)?;
+ Ok(match arrow_type_hint {
+ Some(hint) => apply_hint(physical_type, hint),
+ None => physical_type,
+ })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
+ match (&parquet, &hint) {
+ // Not all time units can be represented as LogicalType / ConvertedType
+ (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
+ (DataType::Int32, DataType::Time32(_)) => hint,
+ (DataType::Int64, DataType::Time64(_)) => hint,
+
+ // Date64 doesn't have a corresponding LogicalType / ConvertedType
+ (DataType::Int64, DataType::Date64) => hint,
+
+ // Coerce Date32 back to Date64 (#1666)
+ (DataType::Date32, DataType::Date64) => hint,
+
+ // Determine timezone
+ (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint,
+
+ // Determine offset size
+ (DataType::Utf8, DataType::LargeUtf8) => hint,
+ (DataType::Binary, DataType::LargeBinary) => hint,
+
+ // Determine interval time unit (#1666)
+ (DataType::Interval(_), DataType::Interval(_)) => hint,
+
+ // Potentially preserve dictionary encoding
+ (_, DataType::Dictionary(_, value)) => {
+ // Apply hint to inner type
+ let hinted = apply_hint(parquet, value.as_ref().clone());
+
+ // If matches dictionary value - preserve dictionary
+ // otherwise use hinted inner type
+ match &hinted == value.as_ref() {
+ true => hint,
+ false => hinted,
+ }
+ }
+ _ => parquet,
+ }
+}
+
+fn from_parquet(parquet_type: &Type) -> Result<DataType> {
+ match parquet_type {
+ Type::PrimitiveType {
+ physical_type,
+ basic_info,
+ type_length,
+ scale,
+ precision,
+ ..
+ } => match physical_type {
+ PhysicalType::BOOLEAN => Ok(DataType::Boolean),
+ PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
+ PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
+ PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+ PhysicalType::FLOAT => Ok(DataType::Float32),
+ PhysicalType::DOUBLE => Ok(DataType::Float64),
+ PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+ from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
+ }
+ },
+ Type::GroupType { .. } => unreachable!(),
+ }
+}
+
+fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+ let scale = scale
+ .try_into()
+ .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+ let precision = precision
+ .try_into()
+ .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
+
+ Ok(DataType::Decimal(precision, scale))
+}
+
+fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+ match (info.logical_type(), info.converted_type()) {
+ (None, ConvertedType::NONE) => Ok(DataType::Int32),
+ (
+ Some(
+ ref t @ LogicalType::Integer {
+ bit_width,
+ is_signed,
+ },
+ ),
+ _,
+ ) => match (bit_width, is_signed) {
+ (8, true) => Ok(DataType::Int8),
+ (16, true) => Ok(DataType::Int16),
+ (32, true) => Ok(DataType::Int32),
+ (8, false) => Ok(DataType::UInt8),
+ (16, false) => Ok(DataType::UInt16),
+ (32, false) => Ok(DataType::UInt32),
+ _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
+ },
+ (Some(LogicalType::Decimal { scale, precision }), _) => {
+ decimal_type(scale, precision)
+ }
+ (Some(LogicalType::Date), _) => Ok(DataType::Date32),
+ (Some(LogicalType::Time { unit, .. }), _) => match unit {
+ ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+ _ => Err(arrow_err!(
+ "Cannot create INT32 physical type from {:?}",
+ unit
+ )),
+ },
+ // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+ (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
+ (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
+ (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
+ (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
+ (None, ConvertedType::INT_8) => Ok(DataType::Int8),
+ (None, ConvertedType::INT_16) => Ok(DataType::Int16),
+ (None, ConvertedType::INT_32) => Ok(DataType::Int32),
+ (None, ConvertedType::DATE) => Ok(DataType::Date32),
+ (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+ (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (logical, converted) => Err(arrow_err!(
+ "Unable to convert parquet INT32 logical type {:?} or converted type {}",
+ logical,
+ converted
+ )),
+ }
+}
+
+fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+ match (info.logical_type(), info.converted_type()) {
+ (None, ConvertedType::NONE) => Ok(DataType::Int64),
+ (
+ Some(LogicalType::Integer {
+ bit_width,
+ is_signed,
+ }),
+ _,
+ ) if bit_width == 64 => match is_signed {
+ true => Ok(DataType::Int64),
+ false => Ok(DataType::UInt64),
+ },
+ (Some(LogicalType::Time { unit, .. }), _) => match unit {
+ ParquetTimeUnit::MILLIS(_) => {
+ Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
+ }
+ ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+ ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
+ },
+ (
+ Some(LogicalType::Timestamp {
+ is_adjusted_to_u_t_c,
+ unit,
+ }),
+ _,
+ ) => Ok(DataType::Timestamp(
+ match unit {
+ ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
+ ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
+ ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
+ },
+ if is_adjusted_to_u_t_c {
+ Some("UTC".to_string())
+ } else {
+ None
+ },
+ )),
+ (None, ConvertedType::INT_64) => Ok(DataType::Int64),
+ (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
+ (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+ (None, ConvertedType::TIMESTAMP_MILLIS) => {
+ Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+ }
+ (None, ConvertedType::TIMESTAMP_MICROS) => {
+ Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+ }
+ (Some(LogicalType::Decimal { scale, precision }), _) => {
+ decimal_type(scale, precision)
+ }
+ (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (logical, converted) => Err(arrow_err!(
+ "Unable to convert parquet INT64 logical type {:?} or converted type {}",
+ logical,
+ converted
+ )),
+ }
+}
+
+fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
+ match (info.logical_type(), info.converted_type()) {
+ (Some(LogicalType::String), _) => Ok(DataType::Utf8),
+ (Some(LogicalType::Json), _) => Ok(DataType::Binary),
+ (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
+ (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
+ (None, ConvertedType::NONE) => Ok(DataType::Binary),
+ (None, ConvertedType::JSON) => Ok(DataType::Binary),
+ (None, ConvertedType::BSON) => Ok(DataType::Binary),
+ (None, ConvertedType::ENUM) => Ok(DataType::Binary),
+ (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
+ (logical, converted) => Err(arrow_err!(
+ "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
+ logical,
+ converted
+ )),
+ }
+}
+
+fn from_fixed_len_byte_array(
+ info: &BasicTypeInfo,
+ scale: i32,
+ precision: i32,
+ type_length: i32,
+) -> Result<DataType> {
+ // TODO: This should check the type length for the decimal and interval types
+ match (info.logical_type(), info.converted_type()) {
+ (Some(LogicalType::Decimal { scale, precision }), _) => {
+ decimal_type(scale, precision)
+ }
+ (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (None, ConvertedType::INTERVAL) => {
+ // There is currently no reliable way of determining which IntervalUnit
+ // to return. Thus without the original Arrow schema, the results
+ // would be incorrect if all 12 bytes of the interval are populated
+ Ok(DataType::Interval(IntervalUnit::DayTime))
+ }
+ _ => Ok(DataType::FixedSizeBinary(type_length)),
+ }
+}
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index be1a22192..fcbb846f1 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -135,6 +135,14 @@ macro_rules! eof_err {
($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*)));
}
+macro_rules! arrow_err {
+ ($fmt:expr) => (ParquetError::ArrowError($fmt.to_owned()));
+ ($fmt:expr, $($args:expr),*) => (ParquetError::ArrowError(format!($fmt, $($args),*)));
+ ($e:expr, $fmt:expr) => (ParquetError::ArrowError($fmt.to_owned(), $e));
+ ($e:ident, $fmt:expr, $($args:tt),*) => (
+ ParquetError::ArrowError(&format!($fmt, $($args),*), $e));
+}
+
// ----------------------------------------------------------------------
// Convert parquet error into other errors