You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/12 13:16:15 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

alamb commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r871336584


##########
parquet/src/arrow/arrow_writer.rs:
##########
@@ -1058,7 +1058,7 @@ mod tests {
         let stocks_field = Field::new(
             "stocks",
             DataType::Map(
-                Box::new(Field::new("entries", entries_struct_type, false)),
+                Box::new(Field::new("entries", entries_struct_type, true)),

Review Comment:
   why this change?



##########
parquet/src/arrow/schema.rs:
##########
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)
         DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
+        DataType::Time32(TimeUnit::Second) => {
+            // Cannot represent seconds in LogicalType

Review Comment:
   Confirming this is a (seemingly better) change in behavior, right -- now no logical type is stored for arrow `Time32(seconds)` but previously the logical type of `Time(millis)` was stored, 



##########
parquet/src/arrow/schema.rs:
##########
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)

Review Comment:
   👍 https://github.com/apache/arrow-rs/issues/1666



##########
parquet/src/arrow/schema.rs:
##########
@@ -1679,7 +1168,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)

Review Comment:
   I didn't see a test for the(new) error case -- I suggest adding one so we don't get accidental regressions



##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
+    match (&parquet, &hint) {
+        // Not all time units can be represented as LogicalType / ConvertedType
+        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
+        (DataType::Int32, DataType::Time32(_)) => hint,
+        (DataType::Int64, DataType::Time64(_)) => hint,
+
+        // Date64 doesn't have a corresponding LogicalType / ConvertedType
+        (DataType::Int64, DataType::Date64) => hint,
+
+        // Coerce Date32 back to Date64 (#1666)
+        (DataType::Date32, DataType::Date64) => hint,
+
+        // Determine timezone
+        (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint,
+
+        // Determine offset size
+        (DataType::Utf8, DataType::LargeUtf8) => hint,
+        (DataType::Binary, DataType::LargeBinary) => hint,
+
+        // Determine interval time unit (#1666)
+        (DataType::Interval(_), DataType::Interval(_)) => hint,
+
+        // Potentially preserve dictionary encoding
+        (_, DataType::Dictionary(_, value)) => {
+            // Apply hint to inner type
+            let hinted = apply_hint(parquet, value.as_ref().clone());
+
+            // If matches dictionary value - preserve dictionary
+            // otherwise use hinted inner type
+            match &hinted == value.as_ref() {
+                true => hint,
+                false => hinted,
+            }
+        }
+        _ => parquet,
+    }
+}
+
+fn from_parquet(parquet_type: &Type) -> Result<DataType> {
+    match parquet_type {
+        Type::PrimitiveType {
+            physical_type,
+            basic_info,
+            type_length,
+            scale,
+            precision,
+            ..
+        } => match physical_type {
+            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
+            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
+            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
+            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+            PhysicalType::FLOAT => Ok(DataType::Float32),
+            PhysicalType::DOUBLE => Ok(DataType::Float64),
+            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
+            }
+        },
+        Type::GroupType { .. } => unreachable!(),
+    }
+}
+
+fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+    let scale = scale
+        .try_into()
+        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+    let precision = precision
+        .try_into()
+        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
+
+    Ok(DataType::Decimal(precision, scale))
+}
+
+fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int32),
+        (
+            Some(
+                ref t @ LogicalType::Integer {
+                    bit_width,
+                    is_signed,
+                },
+            ),
+            _,
+        ) => match (bit_width, is_signed) {
+            (8, true) => Ok(DataType::Int8),
+            (16, true) => Ok(DataType::Int16),
+            (32, true) => Ok(DataType::Int32),
+            (8, false) => Ok(DataType::UInt8),
+            (16, false) => Ok(DataType::UInt16),
+            (32, false) => Ok(DataType::UInt32),
+            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
+        },
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+            _ => Err(arrow_err!(
+                "Cannot create INT32 physical type from {:?}",
+                unit
+            )),
+        },
+        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
+        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
+        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
+        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
+        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
+        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
+        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
+        (None, ConvertedType::DATE) => Ok(DataType::Date32),
+        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int64),
+        (
+            Some(LogicalType::Integer {
+                bit_width,
+                is_signed,
+            }),
+            _,
+        ) if bit_width == 64 => match is_signed {
+            true => Ok(DataType::Int64),
+            false => Ok(DataType::UInt64),
+        },
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => {
+                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
+            }
+            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
+        },
+        (
+            Some(LogicalType::Timestamp {
+                is_adjusted_to_u_t_c,
+                unit,
+            }),
+            _,
+        ) => Ok(DataType::Timestamp(
+            match unit {
+                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
+                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
+                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
+            },
+            if is_adjusted_to_u_t_c {
+                Some("UTC".to_string())
+            } else {
+                None
+            },
+        )),
+        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
+        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
+        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+        (None, ConvertedType::TIMESTAMP_MILLIS) => {
+            Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+        }
+        (None, ConvertedType::TIMESTAMP_MICROS) => {
+            Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+        }
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
+        (Some(LogicalType::Json), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
+        (None, ConvertedType::NONE) => Ok(DataType::Binary),
+        (None, ConvertedType::JSON) => Ok(DataType::Binary),
+        (None, ConvertedType::BSON) => Ok(DataType::Binary),
+        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
+        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_fixed_len_byte_array(
+    info: &BasicTypeInfo,
+    scale: i32,
+    precision: i32,
+    type_length: i32,
+) -> Result<DataType> {
+    // TODO: This should check the type length for the decimal and interval types
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::INTERVAL) => {
+            // There is currently no reliable way of determining which IntervalUnit
+            // to return. Thus without the original Arrow schema, the results
+            // would be incorrect if all 12 bytes of the interval are populated
+            Ok(DataType::Interval(IntervalUnit::DayTime))
+        }
+        _ => Ok(DataType::FixedSizeBinary(type_length)),
+    }
+}

Review Comment:
   also here, some explicit tests showing conversions as a way to document expected behavior would be really nice



##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {

Review Comment:
   I like this centralization of hinting logic -- it makes it easy to understand where arrow and parquet type systems aren't compatible



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -1050,6 +1050,41 @@ mod tests {
         for batch in record_batch_reader {
             batch.unwrap();
         }
+
+        let projected_reader = arrow_reader

Review Comment:
   ```suggestion
           // Test for https://github.com/apache/arrow-rs/issues/1654 and
           // https://github.com/apache/arrow-rs/issues/1652
           let projected_reader = arrow_reader
   ```



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(

Review Comment:
   An `ArrayReaderBuilder` seems more 'common' an interface to me, but free functions are fine too and since this stuff isn't public we can always change it in the future if we wanted to



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),
+    ));
 
-        let arrow_type: Option<ArrowType> = self
-            .get_arrow_field(&cur_type, context)
-            .map(|f| f.data_type().clone());
+    let page_iterator = row_groups.column_chunks(col_idx)?;
+    let null_mask_only = field.def_level == 1 && field.nullable;
+    let arrow_type = Some(field.arrow_type.clone());
 
-        match cur_type.get_physical_type() {
-            PhysicalType::BOOLEAN => Ok(Box::new(
-                PrimitiveArrayReader::<BoolType>::new_with_options(
+    match physical_type {
+        PhysicalType::BOOLEAN => Ok(Box::new(
+            PrimitiveArrayReader::<BoolType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT32 => {
+            if let Some(DataType::Null) = arrow_type {
+                Ok(Box::new(NullArrayReader::<Int32Type>::new(
                     page_iterator,
                     column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT32 => {
-                if let Some(ArrowType::Null) = arrow_type {
-                    Ok(Box::new(NullArrayReader::<Int32Type>::new(
+                )?))
+            } else {
+                Ok(Box::new(
+                    PrimitiveArrayReader::<Int32Type>::new_with_options(
                         page_iterator,
                         column_desc,
-                    )?))
+                        arrow_type,
+                        null_mask_only,
+                    )?,
+                ))
+            }
+        }
+        PhysicalType::INT64 => Ok(Box::new(
+            PrimitiveArrayReader::<Int64Type>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT96 => {
+            // get the optional timezone information from arrow type
+            let timezone = arrow_type.as_ref().and_then(|data_type| {
+                if let DataType::Timestamp(_, tz) = data_type {
+                    tz.clone()
                 } else {
-                    Ok(Box::new(
-                        PrimitiveArrayReader::<Int32Type>::new_with_options(
-                            page_iterator,
-                            column_desc,
-                            arrow_type,
-                            null_mask_only,
-                        )?,
-                    ))
+                    None
                 }
-            }
-            PhysicalType::INT64 => Ok(Box::new(
-                PrimitiveArrayReader::<Int64Type>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT96 => {
-                // get the optional timezone information from arrow type
-                let timezone = arrow_type.as_ref().and_then(|data_type| {
-                    if let ArrowType::Timestamp(_, tz) = data_type {
-                        tz.clone()
-                    } else {
-                        None
-                    }
-                });
-                let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            });
+            let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            Ok(Box::new(ComplexObjectArrayReader::<
+                Int96Type,
+                Int96Converter,
+            >::new(
+                page_iterator,
+                column_desc,
+                converter,
+                arrow_type,
+            )?))
+        }
+        PhysicalType::FLOAT => Ok(Box::new(
+            PrimitiveArrayReader::<FloatType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::DOUBLE => Ok(Box::new(
+            PrimitiveArrayReader::<DoubleType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::BYTE_ARRAY => match arrow_type {
+            Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+            _ => make_byte_array_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+        },
+        PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
+            DataType::Decimal(precision, scale) => {
+                let converter = DecimalConverter::new(DecimalArrayConverter::new(
+                    precision as i32,
+                    scale as i32,
+                ));
                 Ok(Box::new(ComplexObjectArrayReader::<
-                    Int96Type,
-                    Int96Converter,
+                    FixedLenByteArrayType,
+                    DecimalConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FLOAT => Ok(Box::new(
-                PrimitiveArrayReader::<FloatType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::DOUBLE => Ok(Box::new(
-                PrimitiveArrayReader::<DoubleType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::BYTE_ARRAY => match arrow_type {
-                Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+            DataType::Interval(IntervalUnit::DayTime) => {
+                let converter =
+                    IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-                _ => make_byte_array_reader(
+                )?))
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                let converter =
+                    IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-            },
-            PhysicalType::FIXED_LEN_BYTE_ARRAY
-                if cur_type.get_basic_info().converted_type()
-                    == ConvertedType::DECIMAL =>
-            {
-                let converter = DecimalConverter::new(DecimalArrayConverter::new(
-                    cur_type.get_precision(),
-                    cur_type.get_scale(),
-                ));
+                )?))
+            }
+            _ => {
+                let converter =
+                    FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
                 Ok(Box::new(ComplexObjectArrayReader::<
                     FixedLenByteArrayType,
-                    DecimalConverter,
+                    FixedLenBinaryConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                let byte_width = match *cur_type {
-                    Type::PrimitiveType {
-                        ref type_length, ..
-                    } => *type_length,
-                    _ => {
-                        return Err(ArrowError(
-                            "Expected a physical type, not a group type".to_string(),
-                        ))
-                    }
-                };
-                if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL {
-                    if byte_width != 12 {
-                        return Err(ArrowError(format!(
-                            "Parquet interval type should have length of 12, found {}",
-                            byte_width
-                        )));
-                    }
-                    match arrow_type {
-                        Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
-                            let converter = IntervalDayTimeConverter::new(
-                                IntervalDayTimeArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
-                            let converter = IntervalYearMonthConverter::new(
-                                IntervalYearMonthArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(t) => Err(ArrowError(format!(
-                            "Cannot write a Parquet interval to {:?}",
-                            t
-                        ))),
-                        None => {
-                            // we do not support an interval not matched to an Arrow type,
-                            // because we risk data loss as we won't know which of the 12 bytes
-                            // are or should be populated
-                            Err(ArrowError(
-                                "Cannot write a Parquet interval with no Arrow type specified.
-                                There is a risk of data loss as Arrow either supports YearMonth or
-                                DayTime precision. Without the Arrow type, we cannot infer the type.
-                                ".to_string()
-                            ))
-                        }
-                    }
-                } else {
-                    let converter = FixedLenBinaryConverter::new(
-                        FixedSizeArrayConverter::new(byte_width),
-                    );
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        FixedLenByteArrayType,
-                        FixedLenBinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
-                }
-            }
-        }
-    }
-
-    /// Constructs struct array reader without considering repetition.
-    fn build_for_struct_type_inner(
-        &mut self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut fields = Vec::with_capacity(cur_type.get_fields().len());
-        let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
-
-        for child in cur_type.get_fields() {
-            if let Some(child_reader) = self.dispatch(child.clone(), context)? {
-                // TODO: this results in calling get_arrow_field twice, it could be reused
-                // from child_reader above, by making child_reader carry its `Field`
-                let mut struct_context = context.clone();
-                struct_context.path.append(vec![child.name().to_string()]);
-                let field = match self.get_arrow_field(child, &struct_context) {
-                    Some(f) => f.clone(),
-                    _ => Field::new(
-                        child.name(),
-                        child_reader.get_data_type().clone(),
-                        child.is_optional(),
-                    ),
-                };
-                fields.push(field);
-                children_reader.push(child_reader);
-            }
-        }
-
-        if !fields.is_empty() {
-            let arrow_type = ArrowType::Struct(fields);
-            Ok(Some(Box::new(StructArrayReader::new(
-                arrow_type,
-                children_reader,
-                context.def_level,
-                context.rep_level,
-            ))))
-        } else {
-            Ok(None)
-        }
+        },
     }
+}
 
-    fn get_arrow_field(
-        &self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Option<&Field> {
-        let parts: Vec<&str> = context
-            .path
-            .parts()
-            .iter()
-            .map(|x| -> &str { x })
-            .collect::<Vec<&str>>();
-
-        // If the parts length is one it'll have the top level "schema" type. If
-        // it's two then it'll be a top-level type that we can get from the arrow
-        // schema directly.
-        if parts.len() <= 2 {
-            self.arrow_schema.field_with_name(cur_type.name()).ok()
-        } else {
-            // If it's greater than two then we need to traverse the type path
-            // until we find the actual field we're looking for.
-            let mut field: Option<&Field> = None;
-
-            for (i, part) in parts.iter().enumerate().skip(1) {
-                if i == 1 {
-                    field = self.arrow_schema.field_with_name(part).ok();
-                } else if let Some(f) = field {
-                    match f.data_type() {
-                        ArrowType::Struct(fields) => {
-                            field = fields.iter().find(|f| f.name() == part)
-                        }
-                        ArrowType::List(list_field) => match list_field.data_type() {
-                            ArrowType::Struct(fields) => {
-                                field = fields.iter().find(|f| f.name() == part)
-                            }
-                            _ => field = Some(list_field.as_ref()),
-                        },
-                        _ => field = None,
-                    }
-                } else {
-                    field = None;
-                }
-            }
-            field
-        }
-    }
+/// Constructs struct array reader without considering repetition.

Review Comment:
   I don't understand why it wouldn't consider repetition (aka I don't understand why this piece of information is valuable -- not to say it isn't valuable, but I just don't understand enough to know why it is). Perhaps some more comments would help



##########
parquet/src/arrow/schema.rs:
##########
@@ -1261,7 +746,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+                DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),

Review Comment:
   I think the comments need to be updated for the changes in code
   
   ```rust
           // // List<String> (list nullable, elements non-null)
           // optional group my_list (LIST) {
           //   repeated group element {
           //     required binary str (UTF8);
           //   };
           // }
   ```



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),

Review Comment:
   ```suggestion
           // We don't track the column path in ParquetField as it adds a potential source of bugs when the arrow 
           // mapping converts more than one level in the parquet schema into a single arrow field.
           //
           // None of the readers actually use this field, but it is required for this type, so just stick a placeholder in
           ColumnPath::new(vec![]),
   ```



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to extract the
+/// necessary information to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {

Review Comment:
   I really like this structure to encapsulate the logic for using the embedded schema. 👍 



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    /// The level which represents an insertion into the current list
+    /// i.e. guaranteed to be > 0 for a list type
+    pub rep_level: i16,
+    /// The level at which this field is fully defined,
+    /// i.e. guaranteed to be > 0 for a nullable type
+    pub def_level: i16,
+    /// Whether this field is nullable
+    pub nullable: bool,
+    /// The arrow type of the column data
+    ///
+    /// Note: In certain cases the data stored in parquet may have been coerced
+    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
+    pub arrow_type: DataType,
+    /// The type of this field
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    /// Converts `self` into an arrow list, with its current type as the field type
+    ///
+    /// This is used to convert repeated columns, into their arrow representation
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    /// Returns a list of [`ParquetField`] children if this is a group type
+    pub fn children(&self) -> Option<&[Self]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        /// The index of the column in parquet
+        col_idx: usize,
+        /// The type of the column in parquet
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+/// Encodes the context of the parent of the field currently under consideration
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    /// Compute the resulting definition level, repetition level and nullability
+    /// for a child field with the given [`Repetition`]
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
+            _ => primitive_field,
+        }))
+    }
+
+    fn visit_struct(
+        &mut self,
+        struct_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        // The root type will not have a repetition level
+        let repetition = get_repetition(struct_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let parquet_fields = struct_type.get_fields();
+
+        // Extract the arrow fields
+        let arrow_fields = match &context.data_type {
+            Some(DataType::Struct(fields)) => {
+                if fields.len() != parquet_fields.len() {
+                    return Err(arrow_err!(
+                        "incompatible arrow schema, expected {} struct fields got {}",
+                        parquet_fields.len(),
+                        fields.len()
+                    ));
+                }
+                Some(fields)
+            }
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected struct got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        let mut child_fields = Vec::with_capacity(parquet_fields.len());
+        let mut children = Vec::with_capacity(parquet_fields.len());
+
+        // Perform a DFS of children
+        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
+            let data_type = match arrow_fields {
+                Some(fields) => {
+                    let field = &fields[idx];
+                    if field.name() != parquet_field.name() {
+                        return Err(arrow_err!(
+                            "incompatible arrow schema, expected field named {} got {}",
+                            parquet_field.name(),
+                            field.name()
+                        ));
+                    }
+                    Some(field.data_type().clone())
+                }
+                None => None,
+            };
+
+            let arrow_field = arrow_fields.map(|x| &x[idx]);
+            let child_ctx = VisitorContext {
+                rep_level,
+                def_level,
+                data_type,
+            };
+
+            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
+                // The child type returned may be different from what is encoded in the arrow
+                // schema in the event of a mismatch or a projection
+                child_fields.push(convert_field(parquet_field, &child, arrow_field));
+                children.push(child);
+            }
+        }
+
+        if children.is_empty() {
+            return Ok(None);
+        }
+
+        let struct_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type: DataType::Struct(child_fields),
+            field_type: ParquetFieldType::Group { children },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
+            _ => struct_field,
+        }))
+    }
+
+    fn visit_map(
+        &mut self,
+        map_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let rep_level = context.rep_level + 1;
+        let (def_level, nullable) = match get_repetition(map_type) {
+            Repetition::REQUIRED => (context.def_level + 1, false),
+            Repetition::OPTIONAL => (context.def_level + 2, true),
+            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
+        };
+
+        if map_type.get_fields().len() != 1 {
+            return Err(arrow_err!(
+                "Map field must have exactly one key_value child, found {}",
+                map_type.get_fields().len()
+            ));
+        }
+
+        // Add map entry (key_value) to context
+        let map_key_value = &map_type.get_fields()[0];
+        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
+            return Err(arrow_err!("Child of map field must be repeated"));
+        }
+
+        if map_key_value.get_fields().len() != 2 {
+            // According to the specification the values are optional (#1642)
+            return Err(arrow_err!(
+                "Child of map field must have two children, found {}",
+                map_key_value.get_fields().len()
+            ));
+        }
+
+        // Get key and value, and create context for each
+        let map_key = &map_key_value.get_fields()[0];
+        let map_value = &map_key_value.get_fields()[1];
+
+        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
+            return Err(arrow_err!("Map keys must be required"));
+        }
+
+        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
+            return Err(arrow_err!("Map values cannot be repeated"));
+        }
+
+        // Extract the arrow fields
+        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
+            Some(DataType::Map(field, sorted)) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(arrow_err!(
+                            "Map data type should contain struct with two children, got {}",
+                            fields.len()
+                        ));
+                    }
+
+                    (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted)
+                }
+                d => {
+                    return Err(arrow_err!(
+                        "Map data type should contain struct got {}",
+                        d
+                    ));
+                }
+            },
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected map got {}",
+                    d
+                ))
+            }
+            None => (None, None, None, false),
+        };
+
+        let maybe_key = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_key.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_key, context)?
+        };
+
+        let maybe_value = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_value.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_value, context)?
+        };
+
+        // Need both columns to be projected
+        match (maybe_key, maybe_value) {
+            (Some(key), Some(value)) => {
+                let key_field = convert_field(map_key, &key, arrow_key);
+                let value_field = convert_field(map_value, &value, arrow_value);
+
+                let map_field = Field::new(
+                    map_key_value.name(),
+                    DataType::Struct(vec![key_field, value_field]),
+                    nullable,
+                )
+                .with_metadata(arrow_map.and_then(|f| f.metadata().cloned()));
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type: DataType::Map(Box::new(map_field), sorted),
+                    field_type: ParquetFieldType::Group {
+                        children: vec![key, value],
+                    },
+                }))
+            }
+            _ => Ok(None),
+        }
+    }
+
+    fn visit_list(
+        &mut self,
+        list_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if list_type.is_primitive() {
+            return Err(arrow_err!(
+                "{:?} is a list type and can't be processed as primitive.",
+                list_type
+            ));
+        }
+
+        let fields = list_type.get_fields();
+        if fields.len() != 1 {
+            return Err(arrow_err!(
+                "list type must have a single child, found {}",
+                fields.len()
+            ));
+        }
+
+        let repeated_field = &fields[0];
+        if get_repetition(repeated_field) != Repetition::REPEATED {
+            return Err(arrow_err!("List child must be repeated"));
+        }
+
+        // If the list is nullable
+        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
+            Repetition::REQUIRED => (context.def_level, false),
+            Repetition::OPTIONAL => (context.def_level + 1, true),
+            Repetition::REPEATED => {
+                return Err(arrow_err!("List type cannot be repeated"))
+            }
+        };
+
+        let arrow_field = match &context.data_type {
+            Some(DataType::List(f)) => Some(f.as_ref()),
+            Some(DataType::LargeList(f)) => Some(f.as_ref()),
+            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected list got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        if repeated_field.is_primitive() {
+            // If the repeated field is not a group, then its type is the element type and elements are required.
+            //
+            // required/optional group my_list (LIST) {
+            //   repeated int32 element;
+            // }
+            //
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_primitive(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    // visit_primitive will infer a non-nullable list, update if necessary
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        let items = repeated_field.get_fields();
+        if items.len() != 1
+            || repeated_field.name() == "array"
+            || repeated_field.name() == format!("{}_tuple", list_type.name())
+        {
+            // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
+            //
+            // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
+            // with _tuple appended then the repeated type is the element type and elements are required.
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_struct(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        // Regular list handling logic
+        let item_type = &items[0];
+        let rep_level = context.rep_level + 1;
+        let def_level = def_level + 1;
+
+        let new_context = VisitorContext {
+            def_level,
+            rep_level,
+            data_type: arrow_field.map(|f| f.data_type().clone()),
+        };
+
+        match self.dispatch(item_type, new_context) {
+            Ok(Some(item)) => {
+                let item_field = Box::new(convert_field(item_type, &item, arrow_field));
+
+                // Use arrow type as hint for index size
+                let arrow_type = match context.data_type {
+                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
+                    Some(DataType::FixedSizeList(_, len)) => {
+                        DataType::FixedSizeList(item_field, len)
+                    }
+                    _ => DataType::List(item_field),
+                };
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type,
+                    field_type: ParquetFieldType::Group {
+                        children: vec![item],
+                    },
+                }))
+            }
+            r => r,
+        }
+    }
+
+    fn dispatch(
+        &mut self,
+        cur_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if cur_type.is_primitive() {
+            self.visit_primitive(cur_type, context)
+        } else {
+            match cur_type.get_basic_info().converted_type() {
+                ConvertedType::LIST => self.visit_list(cur_type, context),
+                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
+                    self.visit_map(cur_type, context)
+                }
+                _ => self.visit_struct(cur_type, context),
+            }
+        }
+    }
+}
+
+/// Computes the [`Field`] for a child column
+///
+/// The resulting [`Field`] will have the type dictated by `field`, a name
+/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
+fn convert_field(
+    parquet_type: &Type,
+    field: &ParquetField,
+    arrow_hint: Option<&Field>,
+) -> Field {
+    let name = parquet_type.name();
+    let data_type = field.arrow_type.clone();
+    let nullable = field.nullable;
+
+    match arrow_hint {
+        Some(hint) => {
+            // If the inferred type is a dictionary, preserve dictionary metadata
+            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
+                (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
+                    Field::new_dict(name, data_type, nullable, id, ordered)
+                }
+                _ => Field::new(name, data_type, nullable),
+            };
+
+            field.with_metadata(hint.metadata().cloned())
+        }
+        None => Field::new(name, data_type, nullable),
+    }
+}
+
+/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
+/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
+/// [`Schema`] embedded in the parquet metadata
+///
+/// Note: This does not support out of order column projection
+pub fn convert_schema<T: IntoIterator<Item = usize>>(
+    schema: &SchemaDescriptor,
+    leaf_columns: T,
+    embedded_arrow_schema: Option<&Schema>,
+) -> Result<Option<ParquetField>> {
+    let mut leaf_mask = vec![false; schema.num_columns()];
+    let mut last_idx = 0;
+    for i in leaf_columns {
+        if i < last_idx {
+            return Err(general_err!("out of order projection is not supported"));
+        }
+        last_idx = i;
+        leaf_mask[i] = true;
+    }
+
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: leaf_mask,
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
+    };
+
+    visitor.dispatch(&schema.root_schema_ptr(), context)
+}
+
+/// Computes the [`ParquetField`] for the provided `parquet_type`
+pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: vec![true],
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: None,
+    };
+
+    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
+}

Review Comment:
   it would be awesome to add tests specifically for this logic that enumerated parquet types and their expected conversions to arrow.
   
   Maybe that could be done as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org