You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/09 13:11:31 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request, #1682: Fix Parquet Arrow Schema Inference

tustvold opened a new pull request, #1682:
URL: https://github.com/apache/arrow-rs/pull/1682

   _Draft as I need to work out how to deal with unsupported types (#1666)_
   
   # Which issue does this PR close?
   
   Closes #1655
   Closes #1663 
   Closes #1652 
   Closes #1654
   Closes #1681
   Closes #1680
   Closes #1666
   
   # Rationale for this change
   
   See tickets, in particular #1655 
   
   # What changes are included in this PR?
   
   This separates the schema inference logic from the logic that reads the parquet file, this makes the logic clearer, easier to test, and hopefully less buggy.
   
   # Are there any user-facing changes?
   
   Yes, this changes the schema inference behaviour 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb merged pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb merged PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872628593


##########
parquet/src/arrow/schema.rs:
##########
@@ -1128,6 +1128,24 @@ mod tests {
         for i in 0..arrow_fields.len() {
             assert_eq!(arrow_fields[i], converted_fields[i]);
         }
+
+        let err =
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 2, 4], None)
+                .unwrap_err()
+                .to_string();
+
+        assert!(

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872206833


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(

Review Comment:
   As there wasn't any "state" to associate with a builder, and shouldn't ever be, it seemed redundant to create an empty `Builder` struct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] codecov-commenter commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1123301035

   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1682](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e2f12de) into [master](https://codecov.io/gh/apache/arrow-rs/commit/e02869a5b4398b1adece3337099324585fc902c2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e02869a) will **increase** coverage by `0.10%`.
   > The diff coverage is `80.62%`.
   
   > :exclamation: Current head e2f12de differs from pull request most recent head 5fd8cd8. Consider uploading reports for the commit 5fd8cd8 to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1682      +/-   ##
   ==========================================
   + Coverage   83.15%   83.25%   +0.10%     
   ==========================================
     Files         193      195       +2     
     Lines       56007    56049      +42     
   ==========================================
   + Hits        46572    46665      +93     
   + Misses       9435     9384      -51     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `29.62% <ø> (ø)` | |
   | [parquet/src/arrow/schema/complex.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvc2NoZW1hL2NvbXBsZXgucnM=) | `73.81% <73.81%> (ø)` | |
   | [parquet/src/arrow/schema/primitive.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvc2NoZW1hL3ByaW1pdGl2ZS5ycw==) | `76.99% <76.99%> (ø)` | |
   | [parquet/src/arrow/array\_reader/builder.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyL2J1aWxkZXIucnM=) | `93.50% <91.93%> (+24.53%)` | :arrow_up: |
   | [parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvc2NoZW1hLnJz) | `96.76% <92.50%> (+10.98%)` | :arrow_up: |
   | [parquet/src/arrow/array\_reader/list\_array.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyL2xpc3RfYXJyYXkucnM=) | `93.35% <100.00%> (+0.07%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_writer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfd3JpdGVyLnJz) | `97.66% <100.00%> (ø)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `83.83% <0.00%> (-1.85%)` | :arrow_down: |
   | [arrow/src/datatypes/datatype.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9kYXRhdHlwZS5ycw==) | `65.09% <0.00%> (-1.71%)` | :arrow_down: |
   | [parquet/src/schema/visitor.rs](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3Zpc2l0b3IucnM=) | `66.66% <0.00%> (-1.34%)` | :arrow_down: |
   | ... and [11 more](https://codecov.io/gh/apache/arrow-rs/pull/1682/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [e02869a...5fd8cd8](https://codecov.io/gh/apache/arrow-rs/pull/1682?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r871336584


##########
parquet/src/arrow/arrow_writer.rs:
##########
@@ -1058,7 +1058,7 @@ mod tests {
         let stocks_field = Field::new(
             "stocks",
             DataType::Map(
-                Box::new(Field::new("entries", entries_struct_type, false)),
+                Box::new(Field::new("entries", entries_struct_type, true)),

Review Comment:
   why this change?



##########
parquet/src/arrow/schema.rs:
##########
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)
         DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
+        DataType::Time32(TimeUnit::Second) => {
+            // Cannot represent seconds in LogicalType

Review Comment:
   Confirming this is a (seemingly better) change in behavior, right -- now no logical type is stored for arrow `Time32(seconds)` but previously the logical type of `Time(millis)` was stored, 



##########
parquet/src/arrow/schema.rs:
##########
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)

Review Comment:
   👍 https://github.com/apache/arrow-rs/issues/1666



##########
parquet/src/arrow/schema.rs:
##########
@@ -1679,7 +1168,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)

Review Comment:
   I didn't see a test for the(new) error case -- I suggest adding one so we don't get accidental regressions



##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
+    match (&parquet, &hint) {
+        // Not all time units can be represented as LogicalType / ConvertedType
+        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
+        (DataType::Int32, DataType::Time32(_)) => hint,
+        (DataType::Int64, DataType::Time64(_)) => hint,
+
+        // Date64 doesn't have a corresponding LogicalType / ConvertedType
+        (DataType::Int64, DataType::Date64) => hint,
+
+        // Coerce Date32 back to Date64 (#1666)
+        (DataType::Date32, DataType::Date64) => hint,
+
+        // Determine timezone
+        (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint,
+
+        // Determine offset size
+        (DataType::Utf8, DataType::LargeUtf8) => hint,
+        (DataType::Binary, DataType::LargeBinary) => hint,
+
+        // Determine interval time unit (#1666)
+        (DataType::Interval(_), DataType::Interval(_)) => hint,
+
+        // Potentially preserve dictionary encoding
+        (_, DataType::Dictionary(_, value)) => {
+            // Apply hint to inner type
+            let hinted = apply_hint(parquet, value.as_ref().clone());
+
+            // If matches dictionary value - preserve dictionary
+            // otherwise use hinted inner type
+            match &hinted == value.as_ref() {
+                true => hint,
+                false => hinted,
+            }
+        }
+        _ => parquet,
+    }
+}
+
+fn from_parquet(parquet_type: &Type) -> Result<DataType> {
+    match parquet_type {
+        Type::PrimitiveType {
+            physical_type,
+            basic_info,
+            type_length,
+            scale,
+            precision,
+            ..
+        } => match physical_type {
+            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
+            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
+            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
+            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+            PhysicalType::FLOAT => Ok(DataType::Float32),
+            PhysicalType::DOUBLE => Ok(DataType::Float64),
+            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
+            }
+        },
+        Type::GroupType { .. } => unreachable!(),
+    }
+}
+
+fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+    let scale = scale
+        .try_into()
+        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+    let precision = precision
+        .try_into()
+        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
+
+    Ok(DataType::Decimal(precision, scale))
+}
+
+fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int32),
+        (
+            Some(
+                ref t @ LogicalType::Integer {
+                    bit_width,
+                    is_signed,
+                },
+            ),
+            _,
+        ) => match (bit_width, is_signed) {
+            (8, true) => Ok(DataType::Int8),
+            (16, true) => Ok(DataType::Int16),
+            (32, true) => Ok(DataType::Int32),
+            (8, false) => Ok(DataType::UInt8),
+            (16, false) => Ok(DataType::UInt16),
+            (32, false) => Ok(DataType::UInt32),
+            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
+        },
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+            _ => Err(arrow_err!(
+                "Cannot create INT32 physical type from {:?}",
+                unit
+            )),
+        },
+        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
+        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
+        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
+        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
+        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
+        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
+        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
+        (None, ConvertedType::DATE) => Ok(DataType::Date32),
+        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int64),
+        (
+            Some(LogicalType::Integer {
+                bit_width,
+                is_signed,
+            }),
+            _,
+        ) if bit_width == 64 => match is_signed {
+            true => Ok(DataType::Int64),
+            false => Ok(DataType::UInt64),
+        },
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => {
+                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
+            }
+            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
+        },
+        (
+            Some(LogicalType::Timestamp {
+                is_adjusted_to_u_t_c,
+                unit,
+            }),
+            _,
+        ) => Ok(DataType::Timestamp(
+            match unit {
+                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
+                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
+                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
+            },
+            if is_adjusted_to_u_t_c {
+                Some("UTC".to_string())
+            } else {
+                None
+            },
+        )),
+        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
+        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
+        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+        (None, ConvertedType::TIMESTAMP_MILLIS) => {
+            Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+        }
+        (None, ConvertedType::TIMESTAMP_MICROS) => {
+            Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+        }
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
+        (Some(LogicalType::Json), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
+        (None, ConvertedType::NONE) => Ok(DataType::Binary),
+        (None, ConvertedType::JSON) => Ok(DataType::Binary),
+        (None, ConvertedType::BSON) => Ok(DataType::Binary),
+        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
+        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_fixed_len_byte_array(
+    info: &BasicTypeInfo,
+    scale: i32,
+    precision: i32,
+    type_length: i32,
+) -> Result<DataType> {
+    // TODO: This should check the type length for the decimal and interval types
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::INTERVAL) => {
+            // There is currently no reliable way of determining which IntervalUnit
+            // to return. Thus without the original Arrow schema, the results
+            // would be incorrect if all 12 bytes of the interval are populated
+            Ok(DataType::Interval(IntervalUnit::DayTime))
+        }
+        _ => Ok(DataType::FixedSizeBinary(type_length)),
+    }
+}

Review Comment:
   also here, some explicit tests showing conversions as a way to document expected behavior would be really nice



##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {

Review Comment:
   I like this centralization of hinting logic -- it makes it easy to understand where arrow and parquet type systems aren't compatible



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -1050,6 +1050,41 @@ mod tests {
         for batch in record_batch_reader {
             batch.unwrap();
         }
+
+        let projected_reader = arrow_reader

Review Comment:
   ```suggestion
           // Test for https://github.com/apache/arrow-rs/issues/1654 and
           // https://github.com/apache/arrow-rs/issues/1652
           let projected_reader = arrow_reader
   ```



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(

Review Comment:
   An `ArrayReaderBuilder` seems more 'common' an interface to me, but free functions are fine too and since this stuff isn't public we can always change it in the future if we wanted to



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),
+    ));
 
-        let arrow_type: Option<ArrowType> = self
-            .get_arrow_field(&cur_type, context)
-            .map(|f| f.data_type().clone());
+    let page_iterator = row_groups.column_chunks(col_idx)?;
+    let null_mask_only = field.def_level == 1 && field.nullable;
+    let arrow_type = Some(field.arrow_type.clone());
 
-        match cur_type.get_physical_type() {
-            PhysicalType::BOOLEAN => Ok(Box::new(
-                PrimitiveArrayReader::<BoolType>::new_with_options(
+    match physical_type {
+        PhysicalType::BOOLEAN => Ok(Box::new(
+            PrimitiveArrayReader::<BoolType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT32 => {
+            if let Some(DataType::Null) = arrow_type {
+                Ok(Box::new(NullArrayReader::<Int32Type>::new(
                     page_iterator,
                     column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT32 => {
-                if let Some(ArrowType::Null) = arrow_type {
-                    Ok(Box::new(NullArrayReader::<Int32Type>::new(
+                )?))
+            } else {
+                Ok(Box::new(
+                    PrimitiveArrayReader::<Int32Type>::new_with_options(
                         page_iterator,
                         column_desc,
-                    )?))
+                        arrow_type,
+                        null_mask_only,
+                    )?,
+                ))
+            }
+        }
+        PhysicalType::INT64 => Ok(Box::new(
+            PrimitiveArrayReader::<Int64Type>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT96 => {
+            // get the optional timezone information from arrow type
+            let timezone = arrow_type.as_ref().and_then(|data_type| {
+                if let DataType::Timestamp(_, tz) = data_type {
+                    tz.clone()
                 } else {
-                    Ok(Box::new(
-                        PrimitiveArrayReader::<Int32Type>::new_with_options(
-                            page_iterator,
-                            column_desc,
-                            arrow_type,
-                            null_mask_only,
-                        )?,
-                    ))
+                    None
                 }
-            }
-            PhysicalType::INT64 => Ok(Box::new(
-                PrimitiveArrayReader::<Int64Type>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT96 => {
-                // get the optional timezone information from arrow type
-                let timezone = arrow_type.as_ref().and_then(|data_type| {
-                    if let ArrowType::Timestamp(_, tz) = data_type {
-                        tz.clone()
-                    } else {
-                        None
-                    }
-                });
-                let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            });
+            let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            Ok(Box::new(ComplexObjectArrayReader::<
+                Int96Type,
+                Int96Converter,
+            >::new(
+                page_iterator,
+                column_desc,
+                converter,
+                arrow_type,
+            )?))
+        }
+        PhysicalType::FLOAT => Ok(Box::new(
+            PrimitiveArrayReader::<FloatType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::DOUBLE => Ok(Box::new(
+            PrimitiveArrayReader::<DoubleType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::BYTE_ARRAY => match arrow_type {
+            Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+            _ => make_byte_array_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+        },
+        PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
+            DataType::Decimal(precision, scale) => {
+                let converter = DecimalConverter::new(DecimalArrayConverter::new(
+                    precision as i32,
+                    scale as i32,
+                ));
                 Ok(Box::new(ComplexObjectArrayReader::<
-                    Int96Type,
-                    Int96Converter,
+                    FixedLenByteArrayType,
+                    DecimalConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FLOAT => Ok(Box::new(
-                PrimitiveArrayReader::<FloatType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::DOUBLE => Ok(Box::new(
-                PrimitiveArrayReader::<DoubleType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::BYTE_ARRAY => match arrow_type {
-                Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+            DataType::Interval(IntervalUnit::DayTime) => {
+                let converter =
+                    IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-                _ => make_byte_array_reader(
+                )?))
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                let converter =
+                    IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-            },
-            PhysicalType::FIXED_LEN_BYTE_ARRAY
-                if cur_type.get_basic_info().converted_type()
-                    == ConvertedType::DECIMAL =>
-            {
-                let converter = DecimalConverter::new(DecimalArrayConverter::new(
-                    cur_type.get_precision(),
-                    cur_type.get_scale(),
-                ));
+                )?))
+            }
+            _ => {
+                let converter =
+                    FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
                 Ok(Box::new(ComplexObjectArrayReader::<
                     FixedLenByteArrayType,
-                    DecimalConverter,
+                    FixedLenBinaryConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                let byte_width = match *cur_type {
-                    Type::PrimitiveType {
-                        ref type_length, ..
-                    } => *type_length,
-                    _ => {
-                        return Err(ArrowError(
-                            "Expected a physical type, not a group type".to_string(),
-                        ))
-                    }
-                };
-                if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL {
-                    if byte_width != 12 {
-                        return Err(ArrowError(format!(
-                            "Parquet interval type should have length of 12, found {}",
-                            byte_width
-                        )));
-                    }
-                    match arrow_type {
-                        Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
-                            let converter = IntervalDayTimeConverter::new(
-                                IntervalDayTimeArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
-                            let converter = IntervalYearMonthConverter::new(
-                                IntervalYearMonthArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(t) => Err(ArrowError(format!(
-                            "Cannot write a Parquet interval to {:?}",
-                            t
-                        ))),
-                        None => {
-                            // we do not support an interval not matched to an Arrow type,
-                            // because we risk data loss as we won't know which of the 12 bytes
-                            // are or should be populated
-                            Err(ArrowError(
-                                "Cannot write a Parquet interval with no Arrow type specified.
-                                There is a risk of data loss as Arrow either supports YearMonth or
-                                DayTime precision. Without the Arrow type, we cannot infer the type.
-                                ".to_string()
-                            ))
-                        }
-                    }
-                } else {
-                    let converter = FixedLenBinaryConverter::new(
-                        FixedSizeArrayConverter::new(byte_width),
-                    );
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        FixedLenByteArrayType,
-                        FixedLenBinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
-                }
-            }
-        }
-    }
-
-    /// Constructs struct array reader without considering repetition.
-    fn build_for_struct_type_inner(
-        &mut self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut fields = Vec::with_capacity(cur_type.get_fields().len());
-        let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
-
-        for child in cur_type.get_fields() {
-            if let Some(child_reader) = self.dispatch(child.clone(), context)? {
-                // TODO: this results in calling get_arrow_field twice, it could be reused
-                // from child_reader above, by making child_reader carry its `Field`
-                let mut struct_context = context.clone();
-                struct_context.path.append(vec![child.name().to_string()]);
-                let field = match self.get_arrow_field(child, &struct_context) {
-                    Some(f) => f.clone(),
-                    _ => Field::new(
-                        child.name(),
-                        child_reader.get_data_type().clone(),
-                        child.is_optional(),
-                    ),
-                };
-                fields.push(field);
-                children_reader.push(child_reader);
-            }
-        }
-
-        if !fields.is_empty() {
-            let arrow_type = ArrowType::Struct(fields);
-            Ok(Some(Box::new(StructArrayReader::new(
-                arrow_type,
-                children_reader,
-                context.def_level,
-                context.rep_level,
-            ))))
-        } else {
-            Ok(None)
-        }
+        },
     }
+}
 
-    fn get_arrow_field(
-        &self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Option<&Field> {
-        let parts: Vec<&str> = context
-            .path
-            .parts()
-            .iter()
-            .map(|x| -> &str { x })
-            .collect::<Vec<&str>>();
-
-        // If the parts length is one it'll have the top level "schema" type. If
-        // it's two then it'll be a top-level type that we can get from the arrow
-        // schema directly.
-        if parts.len() <= 2 {
-            self.arrow_schema.field_with_name(cur_type.name()).ok()
-        } else {
-            // If it's greater than two then we need to traverse the type path
-            // until we find the actual field we're looking for.
-            let mut field: Option<&Field> = None;
-
-            for (i, part) in parts.iter().enumerate().skip(1) {
-                if i == 1 {
-                    field = self.arrow_schema.field_with_name(part).ok();
-                } else if let Some(f) = field {
-                    match f.data_type() {
-                        ArrowType::Struct(fields) => {
-                            field = fields.iter().find(|f| f.name() == part)
-                        }
-                        ArrowType::List(list_field) => match list_field.data_type() {
-                            ArrowType::Struct(fields) => {
-                                field = fields.iter().find(|f| f.name() == part)
-                            }
-                            _ => field = Some(list_field.as_ref()),
-                        },
-                        _ => field = None,
-                    }
-                } else {
-                    field = None;
-                }
-            }
-            field
-        }
-    }
+/// Constructs struct array reader without considering repetition.

Review Comment:
   I don't understand why it wouldn't consider repetition (aka I don't understand why this piece of information is valuable -- not to say it isn't valuable, but I just don't understand enough to know why it is). Perhaps some more comments would help



##########
parquet/src/arrow/schema.rs:
##########
@@ -1261,7 +746,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+                DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),

Review Comment:
   I think the comments need to be updated for the changes in code
   
   ```rust
           // // List<String> (list nullable, elements non-null)
           // optional group my_list (LIST) {
           //   repeated group element {
           //     required binary str (UTF8);
           //   };
           // }
   ```



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),

Review Comment:
   ```suggestion
           // We don't track the column path in ParquetField as it adds a potential source of bugs when the arrow 
           // mapping converts more than one level in the parquet schema into a single arrow field.
           //
           // None of the readers actually use this field, but it is required for this type, so just stick a placeholder in
           ColumnPath::new(vec![]),
   ```



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to extract the
+/// necessary information to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {

Review Comment:
   I really like this structure to encapsulate the logic for using the embedded schema. 👍 



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    /// The level which represents an insertion into the current list
+    /// i.e. guaranteed to be > 0 for a list type
+    pub rep_level: i16,
+    /// The level at which this field is fully defined,
+    /// i.e. guaranteed to be > 0 for a nullable type
+    pub def_level: i16,
+    /// Whether this field is nullable
+    pub nullable: bool,
+    /// The arrow type of the column data
+    ///
+    /// Note: In certain cases the data stored in parquet may have been coerced
+    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
+    pub arrow_type: DataType,
+    /// The type of this field
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    /// Converts `self` into an arrow list, with its current type as the field type
+    ///
+    /// This is used to convert repeated columns, into their arrow representation
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    /// Returns a list of [`ParquetField`] children if this is a group type
+    pub fn children(&self) -> Option<&[Self]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        /// The index of the column in parquet
+        col_idx: usize,
+        /// The type of the column in parquet
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+/// Encodes the context of the parent of the field currently under consideration
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    /// Compute the resulting definition level, repetition level and nullability
+    /// for a child field with the given [`Repetition`]
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
+            _ => primitive_field,
+        }))
+    }
+
+    fn visit_struct(
+        &mut self,
+        struct_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        // The root type will not have a repetition level
+        let repetition = get_repetition(struct_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let parquet_fields = struct_type.get_fields();
+
+        // Extract the arrow fields
+        let arrow_fields = match &context.data_type {
+            Some(DataType::Struct(fields)) => {
+                if fields.len() != parquet_fields.len() {
+                    return Err(arrow_err!(
+                        "incompatible arrow schema, expected {} struct fields got {}",
+                        parquet_fields.len(),
+                        fields.len()
+                    ));
+                }
+                Some(fields)
+            }
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected struct got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        let mut child_fields = Vec::with_capacity(parquet_fields.len());
+        let mut children = Vec::with_capacity(parquet_fields.len());
+
+        // Perform a DFS of children
+        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
+            let data_type = match arrow_fields {
+                Some(fields) => {
+                    let field = &fields[idx];
+                    if field.name() != parquet_field.name() {
+                        return Err(arrow_err!(
+                            "incompatible arrow schema, expected field named {} got {}",
+                            parquet_field.name(),
+                            field.name()
+                        ));
+                    }
+                    Some(field.data_type().clone())
+                }
+                None => None,
+            };
+
+            let arrow_field = arrow_fields.map(|x| &x[idx]);
+            let child_ctx = VisitorContext {
+                rep_level,
+                def_level,
+                data_type,
+            };
+
+            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
+                // The child type returned may be different from what is encoded in the arrow
+                // schema in the event of a mismatch or a projection
+                child_fields.push(convert_field(parquet_field, &child, arrow_field));
+                children.push(child);
+            }
+        }
+
+        if children.is_empty() {
+            return Ok(None);
+        }
+
+        let struct_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type: DataType::Struct(child_fields),
+            field_type: ParquetFieldType::Group { children },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
+            _ => struct_field,
+        }))
+    }
+
+    fn visit_map(
+        &mut self,
+        map_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let rep_level = context.rep_level + 1;
+        let (def_level, nullable) = match get_repetition(map_type) {
+            Repetition::REQUIRED => (context.def_level + 1, false),
+            Repetition::OPTIONAL => (context.def_level + 2, true),
+            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
+        };
+
+        if map_type.get_fields().len() != 1 {
+            return Err(arrow_err!(
+                "Map field must have exactly one key_value child, found {}",
+                map_type.get_fields().len()
+            ));
+        }
+
+        // Add map entry (key_value) to context
+        let map_key_value = &map_type.get_fields()[0];
+        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
+            return Err(arrow_err!("Child of map field must be repeated"));
+        }
+
+        if map_key_value.get_fields().len() != 2 {
+            // According to the specification the values are optional (#1642)
+            return Err(arrow_err!(
+                "Child of map field must have two children, found {}",
+                map_key_value.get_fields().len()
+            ));
+        }
+
+        // Get key and value, and create context for each
+        let map_key = &map_key_value.get_fields()[0];
+        let map_value = &map_key_value.get_fields()[1];
+
+        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
+            return Err(arrow_err!("Map keys must be required"));
+        }
+
+        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
+            return Err(arrow_err!("Map values cannot be repeated"));
+        }
+
+        // Extract the arrow fields
+        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
+            Some(DataType::Map(field, sorted)) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(arrow_err!(
+                            "Map data type should contain struct with two children, got {}",
+                            fields.len()
+                        ));
+                    }
+
+                    (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted)
+                }
+                d => {
+                    return Err(arrow_err!(
+                        "Map data type should contain struct got {}",
+                        d
+                    ));
+                }
+            },
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected map got {}",
+                    d
+                ))
+            }
+            None => (None, None, None, false),
+        };
+
+        let maybe_key = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_key.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_key, context)?
+        };
+
+        let maybe_value = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_value.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_value, context)?
+        };
+
+        // Need both columns to be projected
+        match (maybe_key, maybe_value) {
+            (Some(key), Some(value)) => {
+                let key_field = convert_field(map_key, &key, arrow_key);
+                let value_field = convert_field(map_value, &value, arrow_value);
+
+                let map_field = Field::new(
+                    map_key_value.name(),
+                    DataType::Struct(vec![key_field, value_field]),
+                    nullable,
+                )
+                .with_metadata(arrow_map.and_then(|f| f.metadata().cloned()));
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type: DataType::Map(Box::new(map_field), sorted),
+                    field_type: ParquetFieldType::Group {
+                        children: vec![key, value],
+                    },
+                }))
+            }
+            _ => Ok(None),
+        }
+    }
+
+    fn visit_list(
+        &mut self,
+        list_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if list_type.is_primitive() {
+            return Err(arrow_err!(
+                "{:?} is a list type and can't be processed as primitive.",
+                list_type
+            ));
+        }
+
+        let fields = list_type.get_fields();
+        if fields.len() != 1 {
+            return Err(arrow_err!(
+                "list type must have a single child, found {}",
+                fields.len()
+            ));
+        }
+
+        let repeated_field = &fields[0];
+        if get_repetition(repeated_field) != Repetition::REPEATED {
+            return Err(arrow_err!("List child must be repeated"));
+        }
+
+        // If the list is nullable
+        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
+            Repetition::REQUIRED => (context.def_level, false),
+            Repetition::OPTIONAL => (context.def_level + 1, true),
+            Repetition::REPEATED => {
+                return Err(arrow_err!("List type cannot be repeated"))
+            }
+        };
+
+        let arrow_field = match &context.data_type {
+            Some(DataType::List(f)) => Some(f.as_ref()),
+            Some(DataType::LargeList(f)) => Some(f.as_ref()),
+            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected list got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        if repeated_field.is_primitive() {
+            // If the repeated field is not a group, then its type is the element type and elements are required.
+            //
+            // required/optional group my_list (LIST) {
+            //   repeated int32 element;
+            // }
+            //
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_primitive(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    // visit_primitive will infer a non-nullable list, update if necessary
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        let items = repeated_field.get_fields();
+        if items.len() != 1
+            || repeated_field.name() == "array"
+            || repeated_field.name() == format!("{}_tuple", list_type.name())
+        {
+            // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
+            //
+            // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
+            // with _tuple appended then the repeated type is the element type and elements are required.
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_struct(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        // Regular list handling logic
+        let item_type = &items[0];
+        let rep_level = context.rep_level + 1;
+        let def_level = def_level + 1;
+
+        let new_context = VisitorContext {
+            def_level,
+            rep_level,
+            data_type: arrow_field.map(|f| f.data_type().clone()),
+        };
+
+        match self.dispatch(item_type, new_context) {
+            Ok(Some(item)) => {
+                let item_field = Box::new(convert_field(item_type, &item, arrow_field));
+
+                // Use arrow type as hint for index size
+                let arrow_type = match context.data_type {
+                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
+                    Some(DataType::FixedSizeList(_, len)) => {
+                        DataType::FixedSizeList(item_field, len)
+                    }
+                    _ => DataType::List(item_field),
+                };
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type,
+                    field_type: ParquetFieldType::Group {
+                        children: vec![item],
+                    },
+                }))
+            }
+            r => r,
+        }
+    }
+
+    fn dispatch(
+        &mut self,
+        cur_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if cur_type.is_primitive() {
+            self.visit_primitive(cur_type, context)
+        } else {
+            match cur_type.get_basic_info().converted_type() {
+                ConvertedType::LIST => self.visit_list(cur_type, context),
+                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
+                    self.visit_map(cur_type, context)
+                }
+                _ => self.visit_struct(cur_type, context),
+            }
+        }
+    }
+}
+
+/// Computes the [`Field`] for a child column
+///
+/// The resulting [`Field`] will have the type dictated by `field`, a name
+/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
+fn convert_field(
+    parquet_type: &Type,
+    field: &ParquetField,
+    arrow_hint: Option<&Field>,
+) -> Field {
+    let name = parquet_type.name();
+    let data_type = field.arrow_type.clone();
+    let nullable = field.nullable;
+
+    match arrow_hint {
+        Some(hint) => {
+            // If the inferred type is a dictionary, preserve dictionary metadata
+            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
+                (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
+                    Field::new_dict(name, data_type, nullable, id, ordered)
+                }
+                _ => Field::new(name, data_type, nullable),
+            };
+
+            field.with_metadata(hint.metadata().cloned())
+        }
+        None => Field::new(name, data_type, nullable),
+    }
+}
+
+/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
+/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
+/// [`Schema`] embedded in the parquet metadata
+///
+/// Note: This does not support out of order column projection
+pub fn convert_schema<T: IntoIterator<Item = usize>>(
+    schema: &SchemaDescriptor,
+    leaf_columns: T,
+    embedded_arrow_schema: Option<&Schema>,
+) -> Result<Option<ParquetField>> {
+    let mut leaf_mask = vec![false; schema.num_columns()];
+    let mut last_idx = 0;
+    for i in leaf_columns {
+        if i < last_idx {
+            return Err(general_err!("out of order projection is not supported"));
+        }
+        last_idx = i;
+        leaf_mask[i] = true;
+    }
+
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: leaf_mask,
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
+    };
+
+    visitor.dispatch(&schema.root_schema_ptr(), context)
+}
+
+/// Computes the [`ParquetField`] for the provided `parquet_type`
+pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: vec![true],
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: None,
+    };
+
+    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
+}

Review Comment:
   it would be awesome to add tests specifically for this logic that enumerated parquet types and their expected conversions to arrow.
   
   Maybe that could be done as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1125921882

   Looking into test failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1122805573

   @tustvold  please let me know when you would like any substantial review for this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r869953255


##########
parquet/src/arrow/schema.rs:
##########
@@ -544,502 +525,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         }
     }
 }
-/// This struct is used to group methods and data structures used to convert parquet
-/// schema together.
-struct ParquetTypeConverter<'a> {
-    schema: &'a Type,
-    /// This is the columns that need to be converted to arrow schema.
-    columns_to_convert: &'a HashSet<*const Type>,
-}
-
-impl<'a> ParquetTypeConverter<'a> {
-    fn new(schema: &'a Type, columns_to_convert: &'a HashSet<*const Type>) -> Self {
-        Self {
-            schema,
-            columns_to_convert,
-        }
-    }
-
-    fn clone_with_schema(&self, other: &'a Type) -> Self {
-        Self {
-            schema: other,
-            columns_to_convert: self.columns_to_convert,
-        }
-    }
-}
-
-impl ParquetTypeConverter<'_> {

Review Comment:
   This logic is copied largely wholesale into schema/primitive.rs



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to extract the
+/// necessary information to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(&primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(&primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),

Review Comment:
   Here is the logic to now support #1680, there is comprehensive test coverage of this in schema.rs, in particular test_arrow_schema_roundtrip



##########
parquet/src/arrow/schema.rs:
##########
@@ -1261,7 +746,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+                DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),

Review Comment:
   Here we can see this fixing #1681 



##########
parquet/src/arrow/schema.rs:
##########
@@ -1679,7 +1168,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)

Review Comment:
   Out of order column projection previously would misbehave as `parquet_to_arrow_schema_by_columns` supported it, but the actual reader logic did not. This makes it consistently not supported, it will error, as it is hard to reason what the correct semantics are in the event of nested schema



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to extract the
+/// necessary information to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {

Review Comment:
   This is logic extracted from builder.rs, it wasn't possible to reuse the existing TypeVisitor as its handling of lists interfered with #1680



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {

Review Comment:
   This is the new structure as described in #1655 



##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {

Review Comment:
   This is the change that fixes #1663 - we only use the arrow schema to hint types



##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,

Review Comment:
   This is what fixes #1654 - we carry the DataType as we walk the tree, which prevents it from misbehaving



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -1050,6 +1050,41 @@ mod tests {
         for batch in record_batch_reader {
             batch.unwrap();
         }
+
+        let projected_reader = arrow_reader

Review Comment:
   This is a test for #1654 and #1652



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872191483


##########
parquet/src/arrow/arrow_writer.rs:
##########
@@ -1058,7 +1058,7 @@ mod tests {
         let stocks_field = Field::new(
             "stocks",
             DataType::Map(
-                Box::new(Field::new("entries", entries_struct_type, false)),
+                Box::new(Field::new("entries", entries_struct_type, true)),

Review Comment:
   I wrote up the issue - https://github.com/apache/arrow-rs/issues/1697
   
   In doing so I think this change is wrong, will revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1126304991

   😅  glad I didn't try to include this in `14.0.0` -- see https://github.com/apache/arrow-rs/issues/1701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] jhorstmann commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
jhorstmann commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1125987741

   I didn't review this in detail, but did run our test suite against this branch and did not notice any issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1126287130

   Merging (is the first PR in what will be released as arrow 15.0.0) 🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r869957607


##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    pub rep_level: i16,
+    pub def_level: i16,
+    pub nullable: bool,
+    pub arrow_type: DataType,
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    pub fn children(&self) -> Option<&[ParquetField]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        col_idx: usize,
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to extract the
+/// necessary information to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(&primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(&primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),

Review Comment:
   Here is the logic to now support #1680, there is comprehensive test coverage of this in schema.rs, in particular test_arrow_schema_roundtrip.
   
   I'm actually quite pleased with this, despite the underlying list representation in parquet being fundamentally different, the ArrayBuilder can be completely oblivious to this fact :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r869976905


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),

Review Comment:
   None of the readers actually use this field, so rather than tracking it, we just populate an empty path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872193044


##########
parquet/src/arrow/schema.rs:
##########
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)
         DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
+        DataType::Time32(TimeUnit::Second) => {
+            // Cannot represent seconds in LogicalType

Review Comment:
   Yes, TBC this wouldn't be wrong if the writer coerced the types to match. The problem is it does not



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872202203


##########
parquet/src/arrow/schema/complex.rs:
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    /// The level which represents an insertion into the current list
+    /// i.e. guaranteed to be > 0 for a list type
+    pub rep_level: i16,
+    /// The level at which this field is fully defined,
+    /// i.e. guaranteed to be > 0 for a nullable type
+    pub def_level: i16,
+    /// Whether this field is nullable
+    pub nullable: bool,
+    /// The arrow type of the column data
+    ///
+    /// Note: In certain cases the data stored in parquet may have been coerced
+    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
+    pub arrow_type: DataType,
+    /// The type of this field
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    /// Converts `self` into an arrow list, with its current type as the field type
+    ///
+    /// This is used to convert repeated columns, into their arrow representation
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    /// Returns a list of [`ParquetField`] children if this is a group type
+    pub fn children(&self) -> Option<&[Self]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        /// The index of the column in parquet
+        col_idx: usize,
+        /// The type of the column in parquet
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+/// Encodes the context of the parent of the field currently under consideration
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    /// Compute the resulting definition level, repetition level and nullability
+    /// for a child field with the given [`Repetition`]
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
+            _ => primitive_field,
+        }))
+    }
+
+    fn visit_struct(
+        &mut self,
+        struct_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        // The root type will not have a repetition level
+        let repetition = get_repetition(struct_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let parquet_fields = struct_type.get_fields();
+
+        // Extract the arrow fields
+        let arrow_fields = match &context.data_type {
+            Some(DataType::Struct(fields)) => {
+                if fields.len() != parquet_fields.len() {
+                    return Err(arrow_err!(
+                        "incompatible arrow schema, expected {} struct fields got {}",
+                        parquet_fields.len(),
+                        fields.len()
+                    ));
+                }
+                Some(fields)
+            }
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected struct got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        let mut child_fields = Vec::with_capacity(parquet_fields.len());
+        let mut children = Vec::with_capacity(parquet_fields.len());
+
+        // Perform a DFS of children
+        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
+            let data_type = match arrow_fields {
+                Some(fields) => {
+                    let field = &fields[idx];
+                    if field.name() != parquet_field.name() {
+                        return Err(arrow_err!(
+                            "incompatible arrow schema, expected field named {} got {}",
+                            parquet_field.name(),
+                            field.name()
+                        ));
+                    }
+                    Some(field.data_type().clone())
+                }
+                None => None,
+            };
+
+            let arrow_field = arrow_fields.map(|x| &x[idx]);
+            let child_ctx = VisitorContext {
+                rep_level,
+                def_level,
+                data_type,
+            };
+
+            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
+                // The child type returned may be different from what is encoded in the arrow
+                // schema in the event of a mismatch or a projection
+                child_fields.push(convert_field(parquet_field, &child, arrow_field));
+                children.push(child);
+            }
+        }
+
+        if children.is_empty() {
+            return Ok(None);
+        }
+
+        let struct_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type: DataType::Struct(child_fields),
+            field_type: ParquetFieldType::Group { children },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
+            _ => struct_field,
+        }))
+    }
+
+    fn visit_map(
+        &mut self,
+        map_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let rep_level = context.rep_level + 1;
+        let (def_level, nullable) = match get_repetition(map_type) {
+            Repetition::REQUIRED => (context.def_level + 1, false),
+            Repetition::OPTIONAL => (context.def_level + 2, true),
+            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
+        };
+
+        if map_type.get_fields().len() != 1 {
+            return Err(arrow_err!(
+                "Map field must have exactly one key_value child, found {}",
+                map_type.get_fields().len()
+            ));
+        }
+
+        // Add map entry (key_value) to context
+        let map_key_value = &map_type.get_fields()[0];
+        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
+            return Err(arrow_err!("Child of map field must be repeated"));
+        }
+
+        if map_key_value.get_fields().len() != 2 {
+            // According to the specification the values are optional (#1642)
+            return Err(arrow_err!(
+                "Child of map field must have two children, found {}",
+                map_key_value.get_fields().len()
+            ));
+        }
+
+        // Get key and value, and create context for each
+        let map_key = &map_key_value.get_fields()[0];
+        let map_value = &map_key_value.get_fields()[1];
+
+        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
+            return Err(arrow_err!("Map keys must be required"));
+        }
+
+        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
+            return Err(arrow_err!("Map values cannot be repeated"));
+        }
+
+        // Extract the arrow fields
+        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
+            Some(DataType::Map(field, sorted)) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(arrow_err!(
+                            "Map data type should contain struct with two children, got {}",
+                            fields.len()
+                        ));
+                    }
+
+                    (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted)
+                }
+                d => {
+                    return Err(arrow_err!(
+                        "Map data type should contain struct got {}",
+                        d
+                    ));
+                }
+            },
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected map got {}",
+                    d
+                ))
+            }
+            None => (None, None, None, false),
+        };
+
+        let maybe_key = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_key.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_key, context)?
+        };
+
+        let maybe_value = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_value.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_value, context)?
+        };
+
+        // Need both columns to be projected
+        match (maybe_key, maybe_value) {
+            (Some(key), Some(value)) => {
+                let key_field = convert_field(map_key, &key, arrow_key);
+                let value_field = convert_field(map_value, &value, arrow_value);
+
+                let map_field = Field::new(
+                    map_key_value.name(),
+                    DataType::Struct(vec![key_field, value_field]),
+                    nullable,
+                )
+                .with_metadata(arrow_map.and_then(|f| f.metadata().cloned()));
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type: DataType::Map(Box::new(map_field), sorted),
+                    field_type: ParquetFieldType::Group {
+                        children: vec![key, value],
+                    },
+                }))
+            }
+            _ => Ok(None),
+        }
+    }
+
+    fn visit_list(
+        &mut self,
+        list_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if list_type.is_primitive() {
+            return Err(arrow_err!(
+                "{:?} is a list type and can't be processed as primitive.",
+                list_type
+            ));
+        }
+
+        let fields = list_type.get_fields();
+        if fields.len() != 1 {
+            return Err(arrow_err!(
+                "list type must have a single child, found {}",
+                fields.len()
+            ));
+        }
+
+        let repeated_field = &fields[0];
+        if get_repetition(repeated_field) != Repetition::REPEATED {
+            return Err(arrow_err!("List child must be repeated"));
+        }
+
+        // If the list is nullable
+        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
+            Repetition::REQUIRED => (context.def_level, false),
+            Repetition::OPTIONAL => (context.def_level + 1, true),
+            Repetition::REPEATED => {
+                return Err(arrow_err!("List type cannot be repeated"))
+            }
+        };
+
+        let arrow_field = match &context.data_type {
+            Some(DataType::List(f)) => Some(f.as_ref()),
+            Some(DataType::LargeList(f)) => Some(f.as_ref()),
+            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected list got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        if repeated_field.is_primitive() {
+            // If the repeated field is not a group, then its type is the element type and elements are required.
+            //
+            // required/optional group my_list (LIST) {
+            //   repeated int32 element;
+            // }
+            //
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_primitive(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    // visit_primitive will infer a non-nullable list, update if necessary
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        let items = repeated_field.get_fields();
+        if items.len() != 1
+            || repeated_field.name() == "array"
+            || repeated_field.name() == format!("{}_tuple", list_type.name())
+        {
+            // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
+            //
+            // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
+            // with _tuple appended then the repeated type is the element type and elements are required.
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_struct(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        // Regular list handling logic
+        let item_type = &items[0];
+        let rep_level = context.rep_level + 1;
+        let def_level = def_level + 1;
+
+        let new_context = VisitorContext {
+            def_level,
+            rep_level,
+            data_type: arrow_field.map(|f| f.data_type().clone()),
+        };
+
+        match self.dispatch(item_type, new_context) {
+            Ok(Some(item)) => {
+                let item_field = Box::new(convert_field(item_type, &item, arrow_field));
+
+                // Use arrow type as hint for index size
+                let arrow_type = match context.data_type {
+                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
+                    Some(DataType::FixedSizeList(_, len)) => {
+                        DataType::FixedSizeList(item_field, len)
+                    }
+                    _ => DataType::List(item_field),
+                };
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type,
+                    field_type: ParquetFieldType::Group {
+                        children: vec![item],
+                    },
+                }))
+            }
+            r => r,
+        }
+    }
+
+    fn dispatch(
+        &mut self,
+        cur_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if cur_type.is_primitive() {
+            self.visit_primitive(cur_type, context)
+        } else {
+            match cur_type.get_basic_info().converted_type() {
+                ConvertedType::LIST => self.visit_list(cur_type, context),
+                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
+                    self.visit_map(cur_type, context)
+                }
+                _ => self.visit_struct(cur_type, context),
+            }
+        }
+    }
+}
+
+/// Computes the [`Field`] for a child column
+///
+/// The resulting [`Field`] will have the type dictated by `field`, a name
+/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
+fn convert_field(
+    parquet_type: &Type,
+    field: &ParquetField,
+    arrow_hint: Option<&Field>,
+) -> Field {
+    let name = parquet_type.name();
+    let data_type = field.arrow_type.clone();
+    let nullable = field.nullable;
+
+    match arrow_hint {
+        Some(hint) => {
+            // If the inferred type is a dictionary, preserve dictionary metadata
+            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
+                (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
+                    Field::new_dict(name, data_type, nullable, id, ordered)
+                }
+                _ => Field::new(name, data_type, nullable),
+            };
+
+            field.with_metadata(hint.metadata().cloned())
+        }
+        None => Field::new(name, data_type, nullable),
+    }
+}
+
+/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
+/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
+/// [`Schema`] embedded in the parquet metadata
+///
+/// Note: This does not support out of order column projection
+pub fn convert_schema<T: IntoIterator<Item = usize>>(
+    schema: &SchemaDescriptor,
+    leaf_columns: T,
+    embedded_arrow_schema: Option<&Schema>,
+) -> Result<Option<ParquetField>> {
+    let mut leaf_mask = vec![false; schema.num_columns()];
+    let mut last_idx = 0;
+    for i in leaf_columns {
+        if i < last_idx {
+            return Err(general_err!("out of order projection is not supported"));
+        }
+        last_idx = i;
+        leaf_mask[i] = true;
+    }
+
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: leaf_mask,
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
+    };
+
+    visitor.dispatch(&schema.root_schema_ptr(), context)
+}
+
+/// Computes the [`ParquetField`] for the provided `parquet_type`
+pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: vec![true],
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: None,
+    };
+
+    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
+}

Review Comment:
   There is fairly good coverage of the type conversion already in schema.rs, but there is definitely scope for testing repetition levels in addition. Filed #1698



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1123274440

   Ok I've backed out the changes related to #1666 from this PR, so this should preserve the existing schema inference behaviour.
   
   I'm confident that this PR will lay the ground work to make #1666 relatively straightforward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1123723375

   I will review this later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r869976905


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),

Review Comment:
   We don't track the column path in ParquetField as it adds a potential source of bugs when the arrow mapping converts more than one level in the parquet schema into a single arrow field.
   
   None of the readers actually use this field, but it is required for this type, so just stick a placeholder in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872203199


##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
+    match (&parquet, &hint) {
+        // Not all time units can be represented as LogicalType / ConvertedType
+        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
+        (DataType::Int32, DataType::Time32(_)) => hint,
+        (DataType::Int64, DataType::Time64(_)) => hint,
+
+        // Date64 doesn't have a corresponding LogicalType / ConvertedType
+        (DataType::Int64, DataType::Date64) => hint,
+
+        // Coerce Date32 back to Date64 (#1666)
+        (DataType::Date32, DataType::Date64) => hint,
+
+        // Determine timezone
+        (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint,
+
+        // Determine offset size
+        (DataType::Utf8, DataType::LargeUtf8) => hint,
+        (DataType::Binary, DataType::LargeBinary) => hint,
+
+        // Determine interval time unit (#1666)
+        (DataType::Interval(_), DataType::Interval(_)) => hint,
+
+        // Potentially preserve dictionary encoding
+        (_, DataType::Dictionary(_, value)) => {
+            // Apply hint to inner type
+            let hinted = apply_hint(parquet, value.as_ref().clone());
+
+            // If matches dictionary value - preserve dictionary
+            // otherwise use hinted inner type
+            match &hinted == value.as_ref() {
+                true => hint,
+                false => hinted,
+            }
+        }
+        _ => parquet,
+    }
+}
+
+fn from_parquet(parquet_type: &Type) -> Result<DataType> {
+    match parquet_type {
+        Type::PrimitiveType {
+            physical_type,
+            basic_info,
+            type_length,
+            scale,
+            precision,
+            ..
+        } => match physical_type {
+            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
+            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
+            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
+            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+            PhysicalType::FLOAT => Ok(DataType::Float32),
+            PhysicalType::DOUBLE => Ok(DataType::Float64),
+            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
+            }
+        },
+        Type::GroupType { .. } => unreachable!(),
+    }
+}
+
+fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+    let scale = scale
+        .try_into()
+        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+    let precision = precision
+        .try_into()
+        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
+
+    Ok(DataType::Decimal(precision, scale))
+}
+
+fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int32),
+        (
+            Some(
+                ref t @ LogicalType::Integer {
+                    bit_width,
+                    is_signed,
+                },
+            ),
+            _,
+        ) => match (bit_width, is_signed) {
+            (8, true) => Ok(DataType::Int8),
+            (16, true) => Ok(DataType::Int16),
+            (32, true) => Ok(DataType::Int32),
+            (8, false) => Ok(DataType::UInt8),
+            (16, false) => Ok(DataType::UInt16),
+            (32, false) => Ok(DataType::UInt32),
+            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
+        },
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+            _ => Err(arrow_err!(
+                "Cannot create INT32 physical type from {:?}",
+                unit
+            )),
+        },
+        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
+        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
+        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
+        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
+        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
+        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
+        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
+        (None, ConvertedType::DATE) => Ok(DataType::Date32),
+        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int64),
+        (
+            Some(LogicalType::Integer {
+                bit_width,
+                is_signed,
+            }),
+            _,
+        ) if bit_width == 64 => match is_signed {
+            true => Ok(DataType::Int64),
+            false => Ok(DataType::UInt64),
+        },
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => {
+                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
+            }
+            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
+        },
+        (
+            Some(LogicalType::Timestamp {
+                is_adjusted_to_u_t_c,
+                unit,
+            }),
+            _,
+        ) => Ok(DataType::Timestamp(
+            match unit {
+                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
+                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
+                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
+            },
+            if is_adjusted_to_u_t_c {
+                Some("UTC".to_string())
+            } else {
+                None
+            },
+        )),
+        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
+        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
+        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+        (None, ConvertedType::TIMESTAMP_MILLIS) => {
+            Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+        }
+        (None, ConvertedType::TIMESTAMP_MICROS) => {
+            Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+        }
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
+        (Some(LogicalType::Json), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
+        (None, ConvertedType::NONE) => Ok(DataType::Binary),
+        (None, ConvertedType::JSON) => Ok(DataType::Binary),
+        (None, ConvertedType::BSON) => Ok(DataType::Binary),
+        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
+        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_fixed_len_byte_array(
+    info: &BasicTypeInfo,
+    scale: i32,
+    precision: i32,
+    type_length: i32,
+) -> Result<DataType> {
+    // TODO: This should check the type length for the decimal and interval types
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::INTERVAL) => {
+            // There is currently no reliable way of determining which IntervalUnit
+            // to return. Thus without the original Arrow schema, the results
+            // would be incorrect if all 12 bytes of the interval are populated
+            Ok(DataType::Interval(IntervalUnit::DayTime))
+        }
+        _ => Ok(DataType::FixedSizeBinary(type_length)),
+    }
+}

Review Comment:
   These type conversions are very well covered by the unit tests in schema.rs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1124283097

   Sorry I don't think i will get to this today -- will do first thing tommorow. Sorry @tustvold 😞   I just need to find enough contiguous time to do the review and that is hard to come by


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872205539


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),
+    ));
 
-        let arrow_type: Option<ArrowType> = self
-            .get_arrow_field(&cur_type, context)
-            .map(|f| f.data_type().clone());
+    let page_iterator = row_groups.column_chunks(col_idx)?;
+    let null_mask_only = field.def_level == 1 && field.nullable;
+    let arrow_type = Some(field.arrow_type.clone());
 
-        match cur_type.get_physical_type() {
-            PhysicalType::BOOLEAN => Ok(Box::new(
-                PrimitiveArrayReader::<BoolType>::new_with_options(
+    match physical_type {
+        PhysicalType::BOOLEAN => Ok(Box::new(
+            PrimitiveArrayReader::<BoolType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT32 => {
+            if let Some(DataType::Null) = arrow_type {
+                Ok(Box::new(NullArrayReader::<Int32Type>::new(
                     page_iterator,
                     column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT32 => {
-                if let Some(ArrowType::Null) = arrow_type {
-                    Ok(Box::new(NullArrayReader::<Int32Type>::new(
+                )?))
+            } else {
+                Ok(Box::new(
+                    PrimitiveArrayReader::<Int32Type>::new_with_options(
                         page_iterator,
                         column_desc,
-                    )?))
+                        arrow_type,
+                        null_mask_only,
+                    )?,
+                ))
+            }
+        }
+        PhysicalType::INT64 => Ok(Box::new(
+            PrimitiveArrayReader::<Int64Type>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT96 => {
+            // get the optional timezone information from arrow type
+            let timezone = arrow_type.as_ref().and_then(|data_type| {
+                if let DataType::Timestamp(_, tz) = data_type {
+                    tz.clone()
                 } else {
-                    Ok(Box::new(
-                        PrimitiveArrayReader::<Int32Type>::new_with_options(
-                            page_iterator,
-                            column_desc,
-                            arrow_type,
-                            null_mask_only,
-                        )?,
-                    ))
+                    None
                 }
-            }
-            PhysicalType::INT64 => Ok(Box::new(
-                PrimitiveArrayReader::<Int64Type>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT96 => {
-                // get the optional timezone information from arrow type
-                let timezone = arrow_type.as_ref().and_then(|data_type| {
-                    if let ArrowType::Timestamp(_, tz) = data_type {
-                        tz.clone()
-                    } else {
-                        None
-                    }
-                });
-                let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            });
+            let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            Ok(Box::new(ComplexObjectArrayReader::<
+                Int96Type,
+                Int96Converter,
+            >::new(
+                page_iterator,
+                column_desc,
+                converter,
+                arrow_type,
+            )?))
+        }
+        PhysicalType::FLOAT => Ok(Box::new(
+            PrimitiveArrayReader::<FloatType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::DOUBLE => Ok(Box::new(
+            PrimitiveArrayReader::<DoubleType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::BYTE_ARRAY => match arrow_type {
+            Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+            _ => make_byte_array_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+        },
+        PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
+            DataType::Decimal(precision, scale) => {
+                let converter = DecimalConverter::new(DecimalArrayConverter::new(
+                    precision as i32,
+                    scale as i32,
+                ));
                 Ok(Box::new(ComplexObjectArrayReader::<
-                    Int96Type,
-                    Int96Converter,
+                    FixedLenByteArrayType,
+                    DecimalConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FLOAT => Ok(Box::new(
-                PrimitiveArrayReader::<FloatType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::DOUBLE => Ok(Box::new(
-                PrimitiveArrayReader::<DoubleType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::BYTE_ARRAY => match arrow_type {
-                Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+            DataType::Interval(IntervalUnit::DayTime) => {
+                let converter =
+                    IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-                _ => make_byte_array_reader(
+                )?))
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                let converter =
+                    IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-            },
-            PhysicalType::FIXED_LEN_BYTE_ARRAY
-                if cur_type.get_basic_info().converted_type()
-                    == ConvertedType::DECIMAL =>
-            {
-                let converter = DecimalConverter::new(DecimalArrayConverter::new(
-                    cur_type.get_precision(),
-                    cur_type.get_scale(),
-                ));
+                )?))
+            }
+            _ => {
+                let converter =
+                    FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
                 Ok(Box::new(ComplexObjectArrayReader::<
                     FixedLenByteArrayType,
-                    DecimalConverter,
+                    FixedLenBinaryConverter,
                 >::new(
                     page_iterator,
                     column_desc,
                     converter,
                     arrow_type,
                 )?))
             }
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                let byte_width = match *cur_type {
-                    Type::PrimitiveType {
-                        ref type_length, ..
-                    } => *type_length,
-                    _ => {
-                        return Err(ArrowError(
-                            "Expected a physical type, not a group type".to_string(),
-                        ))
-                    }
-                };
-                if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL {
-                    if byte_width != 12 {
-                        return Err(ArrowError(format!(
-                            "Parquet interval type should have length of 12, found {}",
-                            byte_width
-                        )));
-                    }
-                    match arrow_type {
-                        Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
-                            let converter = IntervalDayTimeConverter::new(
-                                IntervalDayTimeArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
-                            let converter = IntervalYearMonthConverter::new(
-                                IntervalYearMonthArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(t) => Err(ArrowError(format!(
-                            "Cannot write a Parquet interval to {:?}",
-                            t
-                        ))),
-                        None => {
-                            // we do not support an interval not matched to an Arrow type,
-                            // because we risk data loss as we won't know which of the 12 bytes
-                            // are or should be populated
-                            Err(ArrowError(
-                                "Cannot write a Parquet interval with no Arrow type specified.
-                                There is a risk of data loss as Arrow either supports YearMonth or
-                                DayTime precision. Without the Arrow type, we cannot infer the type.
-                                ".to_string()
-                            ))
-                        }
-                    }
-                } else {
-                    let converter = FixedLenBinaryConverter::new(
-                        FixedSizeArrayConverter::new(byte_width),
-                    );
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        FixedLenByteArrayType,
-                        FixedLenBinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
-                }
-            }
-        }
-    }
-
-    /// Constructs struct array reader without considering repetition.
-    fn build_for_struct_type_inner(
-        &mut self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut fields = Vec::with_capacity(cur_type.get_fields().len());
-        let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
-
-        for child in cur_type.get_fields() {
-            if let Some(child_reader) = self.dispatch(child.clone(), context)? {
-                // TODO: this results in calling get_arrow_field twice, it could be reused
-                // from child_reader above, by making child_reader carry its `Field`
-                let mut struct_context = context.clone();
-                struct_context.path.append(vec![child.name().to_string()]);
-                let field = match self.get_arrow_field(child, &struct_context) {
-                    Some(f) => f.clone(),
-                    _ => Field::new(
-                        child.name(),
-                        child_reader.get_data_type().clone(),
-                        child.is_optional(),
-                    ),
-                };
-                fields.push(field);
-                children_reader.push(child_reader);
-            }
-        }
-
-        if !fields.is_empty() {
-            let arrow_type = ArrowType::Struct(fields);
-            Ok(Some(Box::new(StructArrayReader::new(
-                arrow_type,
-                children_reader,
-                context.def_level,
-                context.rep_level,
-            ))))
-        } else {
-            Ok(None)
-        }
+        },
     }
+}
 
-    fn get_arrow_field(
-        &self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Option<&Field> {
-        let parts: Vec<&str> = context
-            .path
-            .parts()
-            .iter()
-            .map(|x| -> &str { x })
-            .collect::<Vec<&str>>();
-
-        // If the parts length is one it'll have the top level "schema" type. If
-        // it's two then it'll be a top-level type that we can get from the arrow
-        // schema directly.
-        if parts.len() <= 2 {
-            self.arrow_schema.field_with_name(cur_type.name()).ok()
-        } else {
-            // If it's greater than two then we need to traverse the type path
-            // until we find the actual field we're looking for.
-            let mut field: Option<&Field> = None;
-
-            for (i, part) in parts.iter().enumerate().skip(1) {
-                if i == 1 {
-                    field = self.arrow_schema.field_with_name(part).ok();
-                } else if let Some(f) = field {
-                    match f.data_type() {
-                        ArrowType::Struct(fields) => {
-                            field = fields.iter().find(|f| f.name() == part)
-                        }
-                        ArrowType::List(list_field) => match list_field.data_type() {
-                            ArrowType::Struct(fields) => {
-                                field = fields.iter().find(|f| f.name() == part)
-                            }
-                            _ => field = Some(list_field.as_ref()),
-                        },
-                        _ => field = None,
-                    }
-                } else {
-                    field = None;
-                }
-            }
-            field
-        }
-    }
+/// Constructs struct array reader without considering repetition.

Review Comment:
   Oh that's a copy-pasta, this used to be called `build_for_struct_type_inner` and there was an outer `visit_struct` that handled level shenanigans. That logic is now all handled by ParquetField



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r872194456


##########
parquet/src/arrow/schema.rs:
##########
@@ -1261,7 +746,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+                DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),

Review Comment:
   That comment is still correct, this is a nullable list with non-nullable elements, as described by that parquet schema.
   
   The test was previously wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1125874976

   I think this is now ready, if I've missed anything let me know. I think it is worth highlighting as a breaking change in the changelog, so that on the off chance it does break something someone was relying on, even if it likely was a bug, they know where to look and we can hopefully quickly unblock them.
   
   https://xkcd.com/1172/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1682: Fix Parquet Arrow Schema Inference

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#issuecomment-1126306502

   @alamb that's unfortunately expected... DataFusion has a bug... Will provide context on ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org