You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/13 17:36:04 UTC

[arrow-rs] branch master updated: Fix Parquet Arrow Schema Inference (#1682)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b154ea40 Fix Parquet Arrow Schema Inference (#1682)
5b154ea40 is described below

commit 5b154ea40314dc2f09babbb363bf7f1fe439d4eb
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri May 13 18:35:56 2022 +0100

    Fix Parquet Arrow Schema Inference (#1682)
    
    * Separate parquet -> arrow conversion logic (#1655)
    
    Don't treat embedded arrow schema as authoritative (#1663)
    
    Fix projection of nested parquet files (#1652) (#1654)
    
    Fix schema inference for repeated fields (#1681)
    
    Support reading alternative list representations from parquet (#1680)
    
    * Add more tests
    
    * Pass pointers by reference
    
    * More docs
    
    * Fix lint
    
    * Review feedback
    
    * Review feedback
    
    * Fix test failures related to #1697
---
 parquet/src/arrow/array_reader/builder.rs    | 859 ++++++++-------------------
 parquet/src/arrow/array_reader/list_array.rs |  13 +-
 parquet/src/arrow/arrow_reader.rs            |  36 ++
 parquet/src/arrow/schema.rs                  | 725 ++++------------------
 parquet/src/arrow/schema/complex.rs          | 599 +++++++++++++++++++
 parquet/src/arrow/schema/primitive.rs        | 266 +++++++++
 parquet/src/errors.rs                        |   8 +
 7 files changed, 1279 insertions(+), 1227 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index af2896350..ab5800672 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};
+use arrow::datatypes::{DataType, IntervalUnit, SchemaRef};
 
 use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use crate::arrow::array_reader::{
@@ -32,15 +31,14 @@ use crate::arrow::converter::{
     IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
     IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
 };
-use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
+use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
+use crate::basic::Type as PhysicalType;
 use crate::data_type::{
     BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
     Int96Type,
 };
-use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
-use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
-use crate::schema::visitor::TypeVisitor;
+use crate::errors::Result;
+use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
 
 /// Create array reader from parquet schema, column indices, and parquet file reader.
 pub fn build_array_reader<T>(
@@ -52,435 +50,217 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use this child
-                if list_child.name() == "list" && !list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
-
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
-
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
-
-        let arrow_type: Option<ArrowType> = self
-            .get_arrow_field(&cur_type, context)
-            .map(|f| f.data_type().clone());
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-        match cur_type.get_physical_type() {
-            PhysicalType::BOOLEAN => Ok(Box::new(
-                PrimitiveArrayReader::<BoolType>::new_with_options(
+    let physical_type = primitive_type.get_physical_type();
+
+    // We don't track the column path in ParquetField as it adds a potential source
+    // of bugs when the arrow mapping converts more than one level in the parquet
+    // schema into a single arrow field.
+    //
+    // None of the readers actually use this field, but it is required for this type,
+    // so just stick a placeholder in
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),
+    ));
+
+    let page_iterator = row_groups.column_chunks(col_idx)?;
+    let null_mask_only = field.def_level == 1 && field.nullable;
+    let arrow_type = Some(field.arrow_type.clone());
+
+    match physical_type {
+        PhysicalType::BOOLEAN => Ok(Box::new(
+            PrimitiveArrayReader::<BoolType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT32 => {
+            if let Some(DataType::Null) = arrow_type {
+                Ok(Box::new(NullArrayReader::<Int32Type>::new(
                     page_iterator,
                     column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT32 => {
-                if let Some(ArrowType::Null) = arrow_type {
-                    Ok(Box::new(NullArrayReader::<Int32Type>::new(
+                )?))
+            } else {
+                Ok(Box::new(
+                    PrimitiveArrayReader::<Int32Type>::new_with_options(
                         page_iterator,
                         column_desc,
-                    )?))
+                        arrow_type,
+                        null_mask_only,
+                    )?,
+                ))
+            }
+        }
+        PhysicalType::INT64 => Ok(Box::new(
+            PrimitiveArrayReader::<Int64Type>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::INT96 => {
+            // get the optional timezone information from arrow type
+            let timezone = arrow_type.as_ref().and_then(|data_type| {
+                if let DataType::Timestamp(_, tz) = data_type {
+                    tz.clone()
                 } else {
-                    Ok(Box::new(
-                        PrimitiveArrayReader::<Int32Type>::new_with_options(
-                            page_iterator,
-                            column_desc,
-                            arrow_type,
-                            null_mask_only,
-                        )?,
-                    ))
+                    None
                 }
-            }
-            PhysicalType::INT64 => Ok(Box::new(
-                PrimitiveArrayReader::<Int64Type>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::INT96 => {
-                // get the optional timezone information from arrow type
-                let timezone = arrow_type.as_ref().and_then(|data_type| {
-                    if let ArrowType::Timestamp(_, tz) = data_type {
-                        tz.clone()
-                    } else {
-                        None
-                    }
-                });
-                let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            });
+            let converter = Int96Converter::new(Int96ArrayConverter { timezone });
+            Ok(Box::new(ComplexObjectArrayReader::<
+                Int96Type,
+                Int96Converter,
+            >::new(
+                page_iterator,
+                column_desc,
+                converter,
+                arrow_type,
+            )?))
+        }
+        PhysicalType::FLOAT => Ok(Box::new(
+            PrimitiveArrayReader::<FloatType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::DOUBLE => Ok(Box::new(
+            PrimitiveArrayReader::<DoubleType>::new_with_options(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            )?,
+        )),
+        PhysicalType::BYTE_ARRAY => match arrow_type {
+            Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+            _ => make_byte_array_reader(
+                page_iterator,
+                column_desc,
+                arrow_type,
+                null_mask_only,
+            ),
+        },
+        PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
+            DataType::Decimal(precision, scale) => {
+                let converter = DecimalConverter::new(DecimalArrayConverter::new(
+                    precision as i32,
+                    scale as i32,
+                ));
                 Ok(Box::new(ComplexObjectArrayReader::<
-                    Int96Type,
-                    Int96Converter,
+                    FixedLenByteArrayType,
+                    DecimalConverter,
                 >::new(
                     page_iterator,
                     column_desc,
@@ -488,47 +268,38 @@ impl<'a> ArrayReaderBuilder {
                     arrow_type,
                 )?))
             }
-            PhysicalType::FLOAT => Ok(Box::new(
-                PrimitiveArrayReader::<FloatType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::DOUBLE => Ok(Box::new(
-                PrimitiveArrayReader::<DoubleType>::new_with_options(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                    null_mask_only,
-                )?,
-            )),
-            PhysicalType::BYTE_ARRAY => match arrow_type {
-                Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
+            DataType::Interval(IntervalUnit::DayTime) => {
+                let converter =
+                    IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-                _ => make_byte_array_reader(
+                )?))
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                let converter =
+                    IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
+                Ok(Box::new(ComplexObjectArrayReader::<
+                    FixedLenByteArrayType,
+                    _,
+                >::new(
                     page_iterator,
                     column_desc,
+                    converter,
                     arrow_type,
-                    null_mask_only,
-                ),
-            },
-            PhysicalType::FIXED_LEN_BYTE_ARRAY
-                if cur_type.get_basic_info().converted_type()
-                    == ConvertedType::DECIMAL =>
-            {
-                let converter = DecimalConverter::new(DecimalArrayConverter::new(
-                    cur_type.get_precision(),
-                    cur_type.get_scale(),
-                ));
+                )?))
+            }
+            _ => {
+                let converter =
+                    FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
                 Ok(Box::new(ComplexObjectArrayReader::<
                     FixedLenByteArrayType,
-                    DecimalConverter,
+                    FixedLenBinaryConverter,
                 >::new(
                     page_iterator,
                     column_desc,
@@ -536,173 +307,26 @@ impl<'a> ArrayReaderBuilder {
                     arrow_type,
                 )?))
             }
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                let byte_width = match *cur_type {
-                    Type::PrimitiveType {
-                        ref type_length, ..
-                    } => *type_length,
-                    _ => {
-                        return Err(ArrowError(
-                            "Expected a physical type, not a group type".to_string(),
-                        ))
-                    }
-                };
-                if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL {
-                    if byte_width != 12 {
-                        return Err(ArrowError(format!(
-                            "Parquet interval type should have length of 12, found {}",
-                            byte_width
-                        )));
-                    }
-                    match arrow_type {
-                        Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
-                            let converter = IntervalDayTimeConverter::new(
-                                IntervalDayTimeArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
-                            let converter = IntervalYearMonthConverter::new(
-                                IntervalYearMonthArrayConverter {},
-                            );
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                FixedLenByteArrayType,
-                                _,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        Some(t) => Err(ArrowError(format!(
-                            "Cannot write a Parquet interval to {:?}",
-                            t
-                        ))),
-                        None => {
-                            // we do not support an interval not matched to an Arrow type,
-                            // because we risk data loss as we won't know which of the 12 bytes
-                            // are or should be populated
-                            Err(ArrowError(
-                                "Cannot write a Parquet interval with no Arrow type specified.
-                                There is a risk of data loss as Arrow either supports YearMonth or
-                                DayTime precision. Without the Arrow type, we cannot infer the type.
-                                ".to_string()
-                            ))
-                        }
-                    }
-                } else {
-                    let converter = FixedLenBinaryConverter::new(
-                        FixedSizeArrayConverter::new(byte_width),
-                    );
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        FixedLenByteArrayType,
-                        FixedLenBinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
-                }
-            }
-        }
-    }
-
-    /// Constructs struct array reader without considering repetition.
-    fn build_for_struct_type_inner(
-        &mut self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut fields = Vec::with_capacity(cur_type.get_fields().len());
-        let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
-
-        for child in cur_type.get_fields() {
-            if let Some(child_reader) = self.dispatch(child.clone(), context)? {
-                // TODO: this results in calling get_arrow_field twice, it could be reused
-                // from child_reader above, by making child_reader carry its `Field`
-                let mut struct_context = context.clone();
-                struct_context.path.append(vec![child.name().to_string()]);
-                let field = match self.get_arrow_field(child, &struct_context) {
-                    Some(f) => f.clone(),
-                    _ => Field::new(
-                        child.name(),
-                        child_reader.get_data_type().clone(),
-                        child.is_optional(),
-                    ),
-                };
-                fields.push(field);
-                children_reader.push(child_reader);
-            }
-        }
-
-        if !fields.is_empty() {
-            let arrow_type = ArrowType::Struct(fields);
-            Ok(Some(Box::new(StructArrayReader::new(
-                arrow_type,
-                children_reader,
-                context.def_level,
-                context.rep_level,
-            ))))
-        } else {
-            Ok(None)
-        }
+        },
     }
+}
 
-    fn get_arrow_field(
-        &self,
-        cur_type: &Type,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Option<&Field> {
-        let parts: Vec<&str> = context
-            .path
-            .parts()
-            .iter()
-            .map(|x| -> &str { x })
-            .collect::<Vec<&str>>();
-
-        // If the parts length is one it'll have the top level "schema" type. If
-        // it's two then it'll be a top-level type that we can get from the arrow
-        // schema directly.
-        if parts.len() <= 2 {
-            self.arrow_schema.field_with_name(cur_type.name()).ok()
-        } else {
-            // If it's greater than two then we need to traverse the type path
-            // until we find the actual field we're looking for.
-            let mut field: Option<&Field> = None;
-
-            for (i, part) in parts.iter().enumerate().skip(1) {
-                if i == 1 {
-                    field = self.arrow_schema.field_with_name(part).ok();
-                } else if let Some(f) = field {
-                    match f.data_type() {
-                        ArrowType::Struct(fields) => {
-                            field = fields.iter().find(|f| f.name() == part)
-                        }
-                        ArrowType::List(list_field) => match list_field.data_type() {
-                            ArrowType::Struct(fields) => {
-                                field = fields.iter().find(|f| f.name() == part)
-                            }
-                            _ => field = Some(list_field.as_ref()),
-                        },
-                        _ => field = None,
-                    }
-                } else {
-                    field = None;
-                }
-            }
-            field
-        }
-    }
+fn build_struct_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    let children_reader = children
+        .iter()
+        .map(|child| build_reader(child, row_groups))
+        .collect::<Result<Vec<_>>>()?;
+
+    Ok(Box::new(StructArrayReader::new(
+        field.arrow_type.clone(),
+        children_reader,
+        field.def_level,
+        field.rep_level,
+    )) as _)
 }
 
 #[cfg(test)]
@@ -711,6 +335,7 @@ mod tests {
     use crate::arrow::parquet_to_arrow_schema;
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::util::test_common::get_test_file;
+    use arrow::datatypes::Field;
     use std::sync::Arc;
 
     #[test]
@@ -735,9 +360,9 @@ mod tests {
         .unwrap();
 
         // Create arrow types
-        let arrow_type = ArrowType::Struct(vec![Field::new(
+        let arrow_type = DataType::Struct(vec![Field::new(
             "b_struct",
-            ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]),
+            DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)]),
             true,
         )]);
 
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 5b7d4865f..31796d79b 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -584,16 +584,25 @@ mod tests {
 
         let mut array_reader = build_array_reader(
             file_reader.metadata().file_metadata().schema_descr_ptr(),
-            Arc::new(arrow_schema.clone()),
+            Arc::new(arrow_schema),
             vec![0usize].into_iter(),
             Box::new(file_reader),
         )
         .unwrap();
 
         let batch = array_reader.next_batch(100).unwrap();
+        assert_eq!(batch.data_type(), array_reader.get_data_type());
         assert_eq!(
             batch.data_type(),
-            &ArrowType::Struct(arrow_schema.fields().clone())
+            &ArrowType::Struct(vec![Field::new(
+                "table_info",
+                ArrowType::List(Box::new(Field::new(
+                    "table_info",
+                    ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)]),
+                    false
+                ))),
+                false
+            )])
         );
         assert_eq!(batch.len(), 0);
     }
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 7675707e3..0c4ded90b 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -1050,6 +1050,42 @@ mod tests {
         for batch in record_batch_reader {
             batch.unwrap();
         }
+
+        let projected_reader = arrow_reader
+            .get_record_reader_by_columns(vec![3, 8, 10], 60)
+            .unwrap();
+        let projected_schema = arrow_reader
+            .get_schema_by_columns(vec![3, 8, 10], true)
+            .unwrap();
+
+        let expected_schema = Schema::new(vec![
+            Field::new(
+                "roll_num",
+                ArrowDataType::Struct(vec![Field::new(
+                    "count",
+                    ArrowDataType::UInt64,
+                    false,
+                )]),
+                false,
+            ),
+            Field::new(
+                "PC_CUR",
+                ArrowDataType::Struct(vec![
+                    Field::new("mean", ArrowDataType::Int64, false),
+                    Field::new("sum", ArrowDataType::Int64, false),
+                ]),
+                false,
+            ),
+        ]);
+
+        // Tests for #1652 and #1654
+        assert_eq!(projected_reader.schema().as_ref(), &projected_schema);
+        assert_eq!(expected_schema, projected_schema);
+
+        for batch in projected_reader {
+            let batch = batch.unwrap();
+            assert_eq!(batch.schema().as_ref(), &projected_schema);
+        }
     }
 
     #[test]
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index bfccfe7f9..71184e0b6 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -23,22 +23,24 @@
 //!
 //! The interfaces for converting arrow schema to parquet schema is coming.
 
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use std::sync::Arc;
 
-use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
+use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 use arrow::ipc::writer;
 
+use crate::basic::{
+    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit,
+    Type as PhysicalType,
+};
 use crate::errors::{ParquetError::ArrowError, Result};
 use crate::file::{metadata::KeyValue, properties::WriterProperties};
 use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
-use crate::{
-    basic::{
-        ConvertedType, LogicalType, Repetition,
-        TimeUnit as ParquetTimeUnit, Type as PhysicalType,
-    },
-    errors::ParquetError,
-};
+
+mod complex;
+mod primitive;
+
+pub(crate) use complex::{convert_schema, ParquetField, ParquetFieldType};
 
 /// Convert Parquet schema to Arrow schema including optional metadata.
 /// Attempts to decode any existing Arrow schema metadata, falling back
@@ -47,15 +49,11 @@ pub fn parquet_to_arrow_schema(
     parquet_schema: &SchemaDescriptor,
     key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema> {
-    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
-    metadata
-        .remove(super::ARROW_SCHEMA_META_KEY)
-        .map(|encoded| get_arrow_schema_from_metadata(&encoded))
-        .unwrap_or(parquet_to_arrow_schema_by_columns(
-            parquet_schema,
-            0..parquet_schema.columns().len(),
-            key_value_metadata,
-        ))
+    parquet_to_arrow_schema_by_columns(
+        parquet_schema,
+        0..parquet_schema.columns().len(),
+        key_value_metadata,
+    )
 }
 
 /// Convert parquet schema to arrow schema including optional metadata,
@@ -122,58 +120,25 @@ where
     T: IntoIterator<Item = usize>,
 {
     let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
-    let arrow_schema_metadata = metadata
+    let maybe_schema = metadata
         .remove(super::ARROW_SCHEMA_META_KEY)
-        .map(|encoded| get_arrow_schema_from_metadata(&encoded))
-        .map_or(Ok(None), |v| v.map(Some))?;
+        .map(|value| get_arrow_schema_from_metadata(&value))
+        .transpose()?;
 
-    // add the Arrow metadata to the Parquet metadata
-    if let Some(arrow_schema) = &arrow_schema_metadata {
+    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
+    if let Some(arrow_schema) = &maybe_schema {
         arrow_schema.metadata().iter().for_each(|(k, v)| {
-            metadata.insert(k.clone(), v.clone());
+            metadata.entry(k.clone()).or_insert(v.clone());
         });
     }
 
-    let mut base_nodes = Vec::new();
-    let mut base_nodes_set = HashSet::new();
-    let mut leaves = HashSet::new();
-
-    enum FieldType<'a> {
-        Parquet(&'a Type),
-        Arrow(Field),
-    }
-
-    for c in column_indices {
-        let column = parquet_schema.column(c);
-        let name = column.name();
-
-        if let Some(field) = arrow_schema_metadata
-            .as_ref()
-            .and_then(|schema| schema.field_with_name(name).ok().cloned())
-        {
-            base_nodes.push(FieldType::Arrow(field));
-        } else {
-            let column = column.self_type() as *const Type;
-            let root = parquet_schema.get_column_root(c);
-            let root_raw_ptr = root as *const Type;
-
-            leaves.insert(column);
-            if !base_nodes_set.contains(&root_raw_ptr) {
-                base_nodes.push(FieldType::Parquet(root));
-                base_nodes_set.insert(root_raw_ptr);
-            }
-        }
+    match convert_schema(parquet_schema, column_indices, maybe_schema.as_ref())? {
+        Some(field) => match field.arrow_type {
+            DataType::Struct(fields) => Ok(Schema::new_with_metadata(fields, metadata)),
+            _ => unreachable!(),
+        },
+        None => Ok(Schema::new_with_metadata(vec![], metadata)),
     }
-
-    base_nodes
-        .into_iter()
-        .map(|t| match t {
-            FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(),
-            FieldType::Arrow(f) => Ok(Some(f)),
-        })
-        .collect::<Result<Vec<Option<Field>>>>()
-        .map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
-        .map(|fields| Schema::new_with_metadata(fields, metadata))
 }
 
 /// Try to convert Arrow schema metadata into a schema
@@ -299,14 +264,13 @@ fn parse_key_value_metadata(
 
 /// Convert parquet column schema to arrow field.
 pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
-    let schema = parquet_column.self_type();
-
-    let mut leaves = HashSet::new();
-    leaves.insert(parquet_column.self_type() as *const Type);
+    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
 
-    ParquetTypeConverter::new(schema, &leaves)
-        .to_field()
-        .map(|opt| opt.unwrap())
+    Ok(Field::new(
+        parquet_column.name(),
+        field.arrow_type,
+        field.nullable,
+    ))
 }
 
 pub fn decimal_length_from_precision(precision: usize) -> usize {
@@ -385,34 +349,54 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
             .with_repetition(repetition)
             .build(),
-        DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
-            name,
-            PhysicalType::INT64,
-        )
-        .with_logical_type(Some(LogicalType::Timestamp {
-            is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
-            unit: match time_unit {
-                TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
-                TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
-                TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
-            },
-        }))
-        .with_repetition(repetition)
-        .build(),
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Timestamp(time_unit, _) => {
+            Type::primitive_type_builder(name, PhysicalType::INT64)
+                .with_logical_type(Some(LogicalType::Timestamp {
+                    is_adjusted_to_u_t_c: false,
+                    unit: match time_unit {
+                        TimeUnit::Second => unreachable!(),
+                        TimeUnit::Millisecond => {
+                            ParquetTimeUnit::MILLIS(Default::default())
+                        }
+                        TimeUnit::Microsecond => {
+                            ParquetTimeUnit::MICROS(Default::default())
+                        }
+                        TimeUnit::Nanosecond => {
+                            ParquetTimeUnit::NANOS(Default::default())
+                        }
+                    },
+                }))
+                .with_repetition(repetition)
+                .build()
+        }
         DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        // date64 is cast to date32
+        // date64 is cast to date32 (#1666)
         DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
             .build(),
-        DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
+        DataType::Time32(TimeUnit::Second) => {
+            // Cannot represent seconds in LogicalType
+            Type::primitive_type_builder(name, PhysicalType::INT32)
+                .with_repetition(repetition)
+                .build()
+        }
+        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Time {
                 is_adjusted_to_u_t_c: false,
-                unit: ParquetTimeUnit::MILLIS(Default::default()),
+                unit: match unit {
+                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
+                    u => unreachable!("Invalid unit for Time32: {:?}", u),
+                },
             }))
             .with_repetition(repetition)
             .build(),
@@ -544,502 +528,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         }
     }
 }
-/// This struct is used to group methods and data structures used to convert parquet
-/// schema together.
-struct ParquetTypeConverter<'a> {
-    schema: &'a Type,
-    /// This is the columns that need to be converted to arrow schema.
-    columns_to_convert: &'a HashSet<*const Type>,
-}
-
-impl<'a> ParquetTypeConverter<'a> {
-    fn new(schema: &'a Type, columns_to_convert: &'a HashSet<*const Type>) -> Self {
-        Self {
-            schema,
-            columns_to_convert,
-        }
-    }
-
-    fn clone_with_schema(&self, other: &'a Type) -> Self {
-        Self {
-            schema: other,
-            columns_to_convert: self.columns_to_convert,
-        }
-    }
-}
-
-impl ParquetTypeConverter<'_> {
-    // Public interfaces.
-
-    /// Converts parquet schema to arrow data type.
-    ///
-    /// This function discards schema name.
-    ///
-    /// If this schema is a primitive type and not included in the leaves, the result is
-    /// Ok(None).
-    ///
-    /// If this schema is a group type and none of its children is reserved in the
-    /// conversion, the result is Ok(None).
-    fn to_data_type(&self) -> Result<Option<DataType>> {
-        match self.schema {
-            Type::PrimitiveType { .. } => self.to_primitive_type(),
-            Type::GroupType { .. } => self.to_group_type(),
-        }
-    }
-
-    /// Converts parquet schema to arrow field.
-    ///
-    /// This method is roughly the same as
-    /// [`to_data_type`](`ParquetTypeConverter::to_data_type`), except it reserves schema
-    /// name.
-    fn to_field(&self) -> Result<Option<Field>> {
-        self.to_data_type().map(|opt| {
-            opt.map(|dt| Field::new(self.schema.name(), dt, self.is_nullable()))
-        })
-    }
-
-    // Utility functions.
-
-    /// Checks whether this schema is nullable.
-    fn is_nullable(&self) -> bool {
-        let basic_info = self.schema.get_basic_info();
-        if basic_info.has_repetition() {
-            match basic_info.repetition() {
-                Repetition::OPTIONAL => true,
-                Repetition::REPEATED => true,
-                Repetition::REQUIRED => false,
-            }
-        } else {
-            false
-        }
-    }
-
-    fn is_repeated(&self) -> bool {
-        let basic_info = self.schema.get_basic_info();
-
-        basic_info.has_repetition() && basic_info.repetition() == Repetition::REPEATED
-    }
-
-    fn is_self_included(&self) -> bool {
-        self.columns_to_convert
-            .contains(&(self.schema as *const Type))
-    }
-
-    // Functions for primitive types.
-
-    /// Entry point for converting parquet primitive type to arrow type.
-    ///
-    /// This function takes care of repetition.
-    fn to_primitive_type(&self) -> Result<Option<DataType>> {
-        if self.is_self_included() {
-            self.to_primitive_type_inner().map(|dt| {
-                if self.is_repeated() {
-                    Some(DataType::List(Box::new(Field::new(
-                        self.schema.name(),
-                        dt,
-                        self.is_nullable(),
-                    ))))
-                } else {
-                    Some(dt)
-                }
-            })
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Converting parquet primitive type to arrow data type.
-    fn to_primitive_type_inner(&self) -> Result<DataType> {
-        match self.schema.get_physical_type() {
-            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
-            PhysicalType::INT32 => self.from_int32(),
-            PhysicalType::INT64 => self.from_int64(),
-            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
-            PhysicalType::FLOAT => Ok(DataType::Float32),
-            PhysicalType::DOUBLE => Ok(DataType::Float64),
-            PhysicalType::BYTE_ARRAY => self.from_byte_array(),
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => self.from_fixed_len_byte_array(),
-        }
-    }
-
-    #[allow(clippy::wrong_self_convention)]
-    fn from_int32(&self) -> Result<DataType> {
-        match (
-            self.schema.get_basic_info().logical_type(),
-            self.schema.get_basic_info().converted_type(),
-        ) {
-            (None, ConvertedType::NONE) => Ok(DataType::Int32),
-            (Some(ref t @ LogicalType::Integer {
-                bit_width,
-                is_signed,
-            }), _) => match (bit_width, is_signed) {
-                (8, true) => Ok(DataType::Int8),
-                (16, true) => Ok(DataType::Int16),
-                (32, true) => Ok(DataType::Int32),
-                (8, false) => Ok(DataType::UInt8),
-                (16, false) => Ok(DataType::UInt16),
-                (32, false) => Ok(DataType::UInt32),
-                _ => Err(ArrowError(format!(
-                    "Cannot create INT32 physical type from {:?}",
-                    t,
-                ))),
-            },
-            (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
-            (Some(LogicalType::Date), _) => Ok(DataType::Date32),
-            (Some(LogicalType::Time { unit, .. }), _) => match unit {
-                ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
-                _ => Err(ArrowError(format!(
-                    "Cannot create INT32 physical type from {:?}",
-                    unit
-                ))),
-            },
-            // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
-            (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
-            (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
-            (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
-            (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
-            (None, ConvertedType::INT_8) => Ok(DataType::Int8),
-            (None, ConvertedType::INT_16) => Ok(DataType::Int16),
-            (None, ConvertedType::INT_32) => Ok(DataType::Int32),
-            (None, ConvertedType::DATE) => Ok(DataType::Date32),
-            (None, ConvertedType::TIME_MILLIS) => {
-                Ok(DataType::Time32(TimeUnit::Millisecond))
-            }
-            (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
-            (logical, converted) => Err(ArrowError(format!(
-                "Unable to convert parquet INT32 logical type {:?} or converted type {}",
-                logical, converted
-            ))),
-        }
-    }
-
-    #[allow(clippy::wrong_self_convention)]
-    fn from_int64(&self) -> Result<DataType> {
-        match (
-            self.schema.get_basic_info().logical_type(),
-            self.schema.get_basic_info().converted_type(),
-        ) {
-            (None, ConvertedType::NONE) => Ok(DataType::Int64),
-            (Some(LogicalType::Integer { bit_width, is_signed }), _) if bit_width == 64 => {
-                match is_signed {
-                    true => Ok(DataType::Int64),
-                    false => Ok(DataType::UInt64),
-                }
-            }
-            (Some(LogicalType::Time { unit, .. }), _) => match unit {
-                ParquetTimeUnit::MILLIS(_) => Err(ArrowError(
-                    "Cannot create INT64 from MILLIS time unit".to_string(),
-                )),
-                ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
-                ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
-            },
-            (Some(LogicalType::Timestamp { is_adjusted_to_u_t_c, unit }), _) => Ok(DataType::Timestamp(
-                match unit {
-                    ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
-                    ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
-                    ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
-                },
-                if is_adjusted_to_u_t_c {
-                    Some("UTC".to_string())
-                } else {
-                    None
-                },
-            )),
-            (None, ConvertedType::INT_64) => Ok(DataType::Int64),
-            (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
-            (None, ConvertedType::TIME_MICROS) => {
-                Ok(DataType::Time64(TimeUnit::Microsecond))
-            }
-            (None, ConvertedType::TIMESTAMP_MILLIS) => {
-                Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
-            }
-            (None, ConvertedType::TIMESTAMP_MICROS) => {
-                Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
-            }
-            (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
-            (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
-            (logical, converted) => Err(ArrowError(format!(
-                "Unable to convert parquet INT64 logical type {:?} or converted type {}",
-                logical, converted
-            ))),
-        }
-    }
-
-    #[allow(clippy::wrong_self_convention)]
-    fn from_fixed_len_byte_array(&self) -> Result<DataType> {
-        match (
-            self.schema.get_basic_info().logical_type(),
-            self.schema.get_basic_info().converted_type(),
-        ) {
-            (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
-            (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
-            (None, ConvertedType::INTERVAL) => {
-                // There is currently no reliable way of determining which IntervalUnit
-                // to return. Thus without the original Arrow schema, the results
-                // would be incorrect if all 12 bytes of the interval are populated
-                Ok(DataType::Interval(IntervalUnit::DayTime))
-            }
-            _ => {
-                let byte_width = match self.schema {
-                    Type::PrimitiveType {
-                        ref type_length, ..
-                    } => *type_length,
-                    _ => {
-                        return Err(ArrowError(
-                            "Expected a physical type, not a group type".to_string(),
-                        ))
-                    }
-                };
-
-                Ok(DataType::FixedSizeBinary(byte_width))
-            }
-        }
-    }
-
-    fn to_decimal(&self) -> DataType {
-        assert!(self.schema.is_primitive());
-        DataType::Decimal(
-            self.schema.get_precision() as usize,
-            self.schema.get_scale() as usize,
-        )
-    }
-
-    #[allow(clippy::wrong_self_convention)]
-    fn from_byte_array(&self) -> Result<DataType> {
-        match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) {
-            (Some(LogicalType::String), _) => Ok(DataType::Utf8),
-            (Some(LogicalType::Json), _) => Ok(DataType::Binary),
-            (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
-            (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
-            (None, ConvertedType::NONE) => Ok(DataType::Binary),
-            (None, ConvertedType::JSON) => Ok(DataType::Binary),
-            (None, ConvertedType::BSON) => Ok(DataType::Binary),
-            (None, ConvertedType::ENUM) => Ok(DataType::Binary),
-            (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
-            (logical, converted) => Err(ArrowError(format!(
-                "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
-                logical, converted
-            ))),
-        }
-    }
-
-    // Functions for group types.
-
-    /// Entry point for converting parquet group type.
-    ///
-    /// This function takes care of logical type and repetition.
-    fn to_group_type(&self) -> Result<Option<DataType>> {
-        match (
-            self.schema.get_basic_info().logical_type(),
-            self.schema.get_basic_info().converted_type(),
-        ) {
-            (Some(LogicalType::List), _) | (_, ConvertedType::LIST) => self.to_list(),
-            (Some(LogicalType::Map), _)
-            | (_, ConvertedType::MAP)
-            | (_, ConvertedType::MAP_KEY_VALUE) => self.to_map(),
-            (_, _) => {
-                if self.is_repeated() {
-                    self.to_struct().map(|opt| {
-                        opt.map(|dt| {
-                            DataType::List(Box::new(Field::new(
-                                self.schema.name(),
-                                dt,
-                                self.is_nullable(),
-                            )))
-                        })
-                    })
-                } else {
-                    self.to_struct()
-                }
-            }
-        }
-    }
-
-    /// Converts a parquet group type to arrow struct.
-    fn to_struct(&self) -> Result<Option<DataType>> {
-        match self.schema {
-            Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
-                "{:?} is a struct type, and can't be processed as primitive.",
-                self.schema
-            ))),
-            Type::GroupType {
-                basic_info: _,
-                fields,
-            } => fields
-                .iter()
-                .map(|field_ptr| self.clone_with_schema(field_ptr).to_field())
-                .collect::<Result<Vec<Option<Field>>>>()
-                .map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
-                .map(|fields| {
-                    if fields.is_empty() {
-                        None
-                    } else {
-                        Some(DataType::Struct(fields))
-                    }
-                }),
-        }
-    }
-
-    /// Converts a parquet list to arrow list.
-    ///
-    /// To fully understand this algorithm, please refer to
-    /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
-    fn to_list(&self) -> Result<Option<DataType>> {
-        match self.schema {
-            Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
-                "{:?} is a list type and can't be processed as primitive.",
-                self.schema
-            ))),
-            Type::GroupType {
-                basic_info: _,
-                fields,
-            } if fields.len() == 1 => {
-                let list_item = fields.first().unwrap();
-                let item_converter = self.clone_with_schema(list_item);
-
-                let item_type = match list_item.as_ref() {
-                    Type::PrimitiveType { .. } => {
-                        if item_converter.is_repeated() {
-                            item_converter.to_primitive_type_inner().map(Some)
-                        } else {
-                            Err(ArrowError(
-                                "Primitive element type of list must be repeated."
-                                    .to_string(),
-                            ))
-                        }
-                    }
-                    Type::GroupType {
-                        basic_info: _,
-                        fields,
-                    } => {
-                        if fields.len() > 1 {
-                            item_converter.to_struct()
-                        } else if fields.len() == 1
-                            && list_item.name() != "array"
-                            && list_item.name() != format!("{}_tuple", self.schema.name())
-                        {
-                            let nested_item = fields.first().unwrap();
-                            let nested_item_converter =
-                                self.clone_with_schema(nested_item);
-
-                            nested_item_converter.to_data_type()
-                        } else {
-                            item_converter.to_struct()
-                        }
-                    }
-                };
-
-                // Check that the name of the list child is "list", in which case we
-                // get the child nullability and name (normally "element") from the nested
-                // group type.
-                // Without this step, the child incorrectly inherits the parent's optionality
-                let (list_item_name, item_is_optional) = match &item_converter.schema {
-                    Type::GroupType { basic_info, fields }
-                        if basic_info.name() == "list" && fields.len() == 1 =>
-                    {
-                        let field = fields.first().unwrap();
-                        (field.name(), field.is_optional())
-                    }
-                    _ => (list_item.name(), list_item.is_optional()),
-                };
-
-                item_type.map(|opt| {
-                    opt.map(|dt| {
-                        DataType::List(Box::new(Field::new(
-                            list_item_name,
-                            dt,
-                            item_is_optional,
-                        )))
-                    })
-                })
-            }
-            _ => Err(ArrowError(
-                "Group element type of list can only contain one field.".to_string(),
-            )),
-        }
-    }
-
-    /// Converts a parquet map to arrow map.
-    ///
-    /// To fully understand this algorithm, please refer to
-    /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
-    fn to_map(&self) -> Result<Option<DataType>> {
-        match self.schema {
-            Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
-                "{:?} is a map type and can't be processed as primitive.",
-                self.schema
-            ))),
-            Type::GroupType {
-                basic_info: _,
-                fields,
-            } if fields.len() == 1 => {
-                let key_item = fields.first().unwrap();
-
-                let (key_type, value_type) = match key_item.as_ref() {
-                    Type::PrimitiveType { .. } => {
-                        return Err(ArrowError(
-                            "A map can only have a group child type (key_values)."
-                                .to_string(),
-                        ))
-                    }
-                    Type::GroupType {
-                        basic_info: _,
-                        fields,
-                    } => {
-                        if fields.len() != 2 {
-                            return Err(ArrowError(format!("Map type should have 2 fields, a key and value. Found {} fields", fields.len())));
-                        } else {
-                            let nested_key = fields.first().unwrap();
-                            let nested_key_converter = self.clone_with_schema(nested_key);
-
-                            let nested_value = fields.last().unwrap();
-                            let nested_value_converter =
-                                self.clone_with_schema(nested_value);
-
-                            (
-                                nested_key_converter.to_data_type()?.map(|d| {
-                                    Field::new(
-                                        nested_key.name(),
-                                        d,
-                                        nested_key.is_optional(),
-                                    )
-                                }),
-                                nested_value_converter.to_data_type()?.map(|d| {
-                                    Field::new(
-                                        nested_value.name(),
-                                        d,
-                                        nested_value.is_optional(),
-                                    )
-                                }),
-                            )
-                        }
-                    }
-                };
-
-                match (key_type, value_type) {
-                    (Some(key), Some(value)) => Ok(Some(DataType::Map(
-                        Box::new(Field::new(
-                            key_item.name(),
-                            DataType::Struct(vec![key, value]),
-                            self.schema.is_optional(),
-                        )),
-                        false, // There is no information to tell if keys are sorted
-                    ))),
-                    (None, None) => Ok(None),
-                    (None, Some(_)) => Err(ArrowError(
-                        "Could not convert the map key to a valid datatype".to_string(),
-                    )),
-                    (Some(_), None) => Err(ArrowError(
-                        "Could not convert the map value to a valid datatype".to_string(),
-                    )),
-                }
-            }
-            _ => Err(ArrowError(
-                "Group element type of map can only contain one field.".to_string(),
-            )),
-        }
-    }
-}
 
 #[cfg(test)]
 mod tests {
@@ -1261,7 +749,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
+                DataType::List(Box::new(Field::new("str", DataType::Utf8, false))),
                 true,
             ));
         }
@@ -1273,7 +761,7 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", DataType::Int32, true))),
+                DataType::List(Box::new(Field::new("element", DataType::Int32, false))),
                 true,
             ));
         }
@@ -1292,7 +780,7 @@ mod tests {
             ]);
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("element", arrow_struct, true))),
+                DataType::List(Box::new(Field::new("element", arrow_struct, false))),
                 true,
             ));
         }
@@ -1309,7 +797,7 @@ mod tests {
                 DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("array", arrow_struct, true))),
+                DataType::List(Box::new(Field::new("array", arrow_struct, false))),
                 true,
             ));
         }
@@ -1326,7 +814,11 @@ mod tests {
                 DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
             arrow_fields.push(Field::new(
                 "my_list",
-                DataType::List(Box::new(Field::new("my_list_tuple", arrow_struct, true))),
+                DataType::List(Box::new(Field::new(
+                    "my_list_tuple",
+                    arrow_struct,
+                    false,
+                ))),
                 true,
             ));
         }
@@ -1336,8 +828,8 @@ mod tests {
         {
             arrow_fields.push(Field::new(
                 "name",
-                DataType::List(Box::new(Field::new("name", DataType::Int32, true))),
-                true,
+                DataType::List(Box::new(Field::new("name", DataType::Int32, false))),
+                false,
             ));
         }
 
@@ -1350,7 +842,7 @@ mod tests {
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
         for i in 0..arrow_fields.len() {
-            assert_eq!(arrow_fields[i], converted_fields[i]);
+            assert_eq!(arrow_fields[i], converted_fields[i], "{}", i);
         }
     }
 
@@ -1503,7 +995,7 @@ mod tests {
                             Field::new("str", DataType::Utf8, false),
                             Field::new("num", DataType::Int32, false),
                         ]),
-                        true,
+                        false, // (#1697)
                     )),
                     false,
                 ),
@@ -1528,7 +1020,7 @@ mod tests {
                             Field::new("key", DataType::Utf8, false),
                             Field::new("value", DataType::Int32, true),
                         ]),
-                        true,
+                        false, // (#1697)
                     )),
                     false,
                 ),
@@ -1636,21 +1128,39 @@ mod tests {
         for i in 0..arrow_fields.len() {
             assert_eq!(arrow_fields[i], converted_fields[i]);
         }
+
+        let err =
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 2, 4], None)
+                .unwrap_err()
+                .to_string();
+
+        assert!(
+            err.contains("out of order projection is not supported"),
+            "{}",
+            err
+        );
+
+        let err =
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 3, 4], None)
+                .unwrap_err()
+                .to_string();
+
+        assert!(err.contains("repeated column projection is not supported, column 3 appeared multiple times"), "{}", err);
     }
 
     #[test]
     fn test_nested_schema_partial_ordering() {
         let mut arrow_fields = Vec::new();
         {
+            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
+            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
+            arrow_fields.push(group1);
+
             let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)];
             let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
             arrow_fields.push(group2);
 
             arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
-
-            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
-            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
-            arrow_fields.push(group1);
         }
 
         let message_type = "
@@ -1679,7 +1189,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)
                 .unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
@@ -1700,9 +1210,9 @@ mod tests {
                 DataType::List(Box::new(Field::new(
                     "innerGroup",
                     DataType::Struct(vec![Field::new("leaf3", DataType::Int32, true)]),
-                    true,
+                    false,
                 ))),
-                true,
+                false,
             );
 
             let outer_group_list = Field::new(
@@ -1713,9 +1223,9 @@ mod tests {
                         Field::new("leaf2", DataType::Int32, true),
                         inner_group_list,
                     ]),
-                    true,
+                    false,
                 ))),
-                true,
+                false,
             );
             arrow_fields.push(outer_group_list);
         }
@@ -1790,8 +1300,8 @@ mod tests {
             Field::new("string", DataType::Utf8, true),
             Field::new(
                 "bools",
-                DataType::List(Box::new(Field::new("bools", DataType::Boolean, true))),
-                true,
+                DataType::List(Box::new(Field::new("bools", DataType::Boolean, false))),
+                false,
             ),
             Field::new("date", DataType::Date32, true),
             Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
@@ -2099,7 +1609,7 @@ mod tests {
                                     true,
                                 ),
                             ]),
-                            true,
+                            false, // #1697
                         )),
                         false, // fails to roundtrip keys_sorted
                     ),
@@ -2122,7 +1632,7 @@ mod tests {
                                     true,
                                 ),
                             ]),
-                            true,
+                            false, // #1697
                         )),
                         false, // fails to roundtrip keys_sorted
                     ),
@@ -2179,7 +1689,6 @@ mod tests {
     }
 
     #[test]
-    #[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"]
     fn test_arrow_schema_roundtrip_lists() -> Result<()> {
         let metadata: HashMap<String, String> =
             [("Key".to_string(), "Value".to_string())]
diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs
new file mode 100644
index 000000000..31a9d6e82
--- /dev/null
+++ b/parquet/src/arrow/schema/complex.rs
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema::primitive::convert_primitive;
+use crate::basic::{ConvertedType, Repetition};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
+use arrow::datatypes::{DataType, Field, Schema};
+
+fn get_repetition(t: &Type) -> Repetition {
+    let info = t.get_basic_info();
+    match info.has_repetition() {
+        true => info.repetition(),
+        false => Repetition::REQUIRED,
+    }
+}
+
+/// Representation of a parquet file, in terms of arrow schema elements
+pub struct ParquetField {
+    /// The level which represents an insertion into the current list
+    /// i.e. guaranteed to be > 0 for a list type
+    pub rep_level: i16,
+    /// The level at which this field is fully defined,
+    /// i.e. guaranteed to be > 0 for a nullable type
+    pub def_level: i16,
+    /// Whether this field is nullable
+    pub nullable: bool,
+    /// The arrow type of the column data
+    ///
+    /// Note: In certain cases the data stored in parquet may have been coerced
+    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
+    pub arrow_type: DataType,
+    /// The type of this field
+    pub field_type: ParquetFieldType,
+}
+
+impl ParquetField {
+    /// Converts `self` into an arrow list, with its current type as the field type
+    ///
+    /// This is used to convert repeated columns, into their arrow representation
+    fn into_list(self, name: &str) -> Self {
+        ParquetField {
+            rep_level: self.rep_level,
+            def_level: self.def_level,
+            nullable: false,
+            arrow_type: DataType::List(Box::new(Field::new(
+                name,
+                self.arrow_type.clone(),
+                false,
+            ))),
+            field_type: ParquetFieldType::Group {
+                children: vec![self],
+            },
+        }
+    }
+
+    /// Returns a list of [`ParquetField`] children if this is a group type
+    pub fn children(&self) -> Option<&[Self]> {
+        match &self.field_type {
+            ParquetFieldType::Primitive { .. } => None,
+            ParquetFieldType::Group { children } => Some(children),
+        }
+    }
+}
+
+pub enum ParquetFieldType {
+    Primitive {
+        /// The index of the column in parquet
+        col_idx: usize,
+        /// The type of the column in parquet
+        primitive_type: TypePtr,
+    },
+    Group {
+        children: Vec<ParquetField>,
+    },
+}
+
+/// Encodes the context of the parent of the field currently under consideration
+struct VisitorContext {
+    rep_level: i16,
+    def_level: i16,
+    /// An optional [`DataType`] sourced from the embedded arrow schema
+    data_type: Option<DataType>,
+}
+
+impl VisitorContext {
+    /// Compute the resulting definition level, repetition level and nullability
+    /// for a child field with the given [`Repetition`]
+    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
+        match repetition {
+            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
+            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
+            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
+        }
+    }
+}
+
+/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
+///
+/// See [Logical Types] for more information on the conversion algorithm
+///
+/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+struct Visitor {
+    /// The column index of the next leaf column
+    next_col_idx: usize,
+
+    /// Mask of columns to include
+    column_mask: Vec<bool>,
+}
+
+impl Visitor {
+    fn visit_primitive(
+        &mut self,
+        primitive_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let col_idx = self.next_col_idx;
+        self.next_col_idx += 1;
+
+        if !self.column_mask[col_idx] {
+            return Ok(None);
+        }
+
+        let repetition = get_repetition(primitive_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
+
+        let primitive_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type,
+            field_type: ParquetFieldType::Primitive {
+                primitive_type: primitive_type.clone(),
+                col_idx,
+            },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
+            _ => primitive_field,
+        }))
+    }
+
+    fn visit_struct(
+        &mut self,
+        struct_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        // The root type will not have a repetition level
+        let repetition = get_repetition(struct_type);
+        let (def_level, rep_level, nullable) = context.levels(repetition);
+
+        let parquet_fields = struct_type.get_fields();
+
+        // Extract the arrow fields
+        let arrow_fields = match &context.data_type {
+            Some(DataType::Struct(fields)) => {
+                if fields.len() != parquet_fields.len() {
+                    return Err(arrow_err!(
+                        "incompatible arrow schema, expected {} struct fields got {}",
+                        parquet_fields.len(),
+                        fields.len()
+                    ));
+                }
+                Some(fields)
+            }
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected struct got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        let mut child_fields = Vec::with_capacity(parquet_fields.len());
+        let mut children = Vec::with_capacity(parquet_fields.len());
+
+        // Perform a DFS of children
+        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
+            let data_type = match arrow_fields {
+                Some(fields) => {
+                    let field = &fields[idx];
+                    if field.name() != parquet_field.name() {
+                        return Err(arrow_err!(
+                            "incompatible arrow schema, expected field named {} got {}",
+                            parquet_field.name(),
+                            field.name()
+                        ));
+                    }
+                    Some(field.data_type().clone())
+                }
+                None => None,
+            };
+
+            let arrow_field = arrow_fields.map(|x| &x[idx]);
+            let child_ctx = VisitorContext {
+                rep_level,
+                def_level,
+                data_type,
+            };
+
+            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
+                // The child type returned may be different from what is encoded in the arrow
+                // schema in the event of a mismatch or a projection
+                child_fields.push(convert_field(parquet_field, &child, arrow_field));
+                children.push(child);
+            }
+        }
+
+        if children.is_empty() {
+            return Ok(None);
+        }
+
+        let struct_field = ParquetField {
+            rep_level,
+            def_level,
+            nullable,
+            arrow_type: DataType::Struct(child_fields),
+            field_type: ParquetFieldType::Group { children },
+        };
+
+        Ok(Some(match repetition {
+            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
+            _ => struct_field,
+        }))
+    }
+
+    fn visit_map(
+        &mut self,
+        map_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        let rep_level = context.rep_level + 1;
+        let (def_level, nullable) = match get_repetition(map_type) {
+            Repetition::REQUIRED => (context.def_level + 1, false),
+            Repetition::OPTIONAL => (context.def_level + 2, true),
+            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
+        };
+
+        if map_type.get_fields().len() != 1 {
+            return Err(arrow_err!(
+                "Map field must have exactly one key_value child, found {}",
+                map_type.get_fields().len()
+            ));
+        }
+
+        // Add map entry (key_value) to context
+        let map_key_value = &map_type.get_fields()[0];
+        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
+            return Err(arrow_err!("Child of map field must be repeated"));
+        }
+
+        if map_key_value.get_fields().len() != 2 {
+            // According to the specification the values are optional (#1642)
+            return Err(arrow_err!(
+                "Child of map field must have two children, found {}",
+                map_key_value.get_fields().len()
+            ));
+        }
+
+        // Get key and value, and create context for each
+        let map_key = &map_key_value.get_fields()[0];
+        let map_value = &map_key_value.get_fields()[1];
+
+        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
+            return Err(arrow_err!("Map keys must be required"));
+        }
+
+        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
+            return Err(arrow_err!("Map values cannot be repeated"));
+        }
+
+        // Extract the arrow fields
+        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
+            Some(DataType::Map(field, sorted)) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(arrow_err!(
+                            "Map data type should contain struct with two children, got {}",
+                            fields.len()
+                        ));
+                    }
+
+                    (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted)
+                }
+                d => {
+                    return Err(arrow_err!(
+                        "Map data type should contain struct got {}",
+                        d
+                    ));
+                }
+            },
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected map got {}",
+                    d
+                ))
+            }
+            None => (None, None, None, false),
+        };
+
+        let maybe_key = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_key.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_key, context)?
+        };
+
+        let maybe_value = {
+            let context = VisitorContext {
+                rep_level,
+                def_level,
+                data_type: arrow_value.map(|x| x.data_type().clone()),
+            };
+
+            self.dispatch(map_value, context)?
+        };
+
+        // Need both columns to be projected
+        match (maybe_key, maybe_value) {
+            (Some(key), Some(value)) => {
+                let key_field = convert_field(map_key, &key, arrow_key);
+                let value_field = convert_field(map_value, &value, arrow_value);
+
+                let map_field = Field::new(
+                    map_key_value.name(),
+                    DataType::Struct(vec![key_field, value_field]),
+                    false, // The inner map field is always non-nullable (#1697)
+                )
+                .with_metadata(arrow_map.and_then(|f| f.metadata().cloned()));
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type: DataType::Map(Box::new(map_field), sorted),
+                    field_type: ParquetFieldType::Group {
+                        children: vec![key, value],
+                    },
+                }))
+            }
+            _ => Ok(None),
+        }
+    }
+
+    fn visit_list(
+        &mut self,
+        list_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if list_type.is_primitive() {
+            return Err(arrow_err!(
+                "{:?} is a list type and can't be processed as primitive.",
+                list_type
+            ));
+        }
+
+        let fields = list_type.get_fields();
+        if fields.len() != 1 {
+            return Err(arrow_err!(
+                "list type must have a single child, found {}",
+                fields.len()
+            ));
+        }
+
+        let repeated_field = &fields[0];
+        if get_repetition(repeated_field) != Repetition::REPEATED {
+            return Err(arrow_err!("List child must be repeated"));
+        }
+
+        // If the list is nullable
+        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
+            Repetition::REQUIRED => (context.def_level, false),
+            Repetition::OPTIONAL => (context.def_level + 1, true),
+            Repetition::REPEATED => {
+                return Err(arrow_err!("List type cannot be repeated"))
+            }
+        };
+
+        let arrow_field = match &context.data_type {
+            Some(DataType::List(f)) => Some(f.as_ref()),
+            Some(DataType::LargeList(f)) => Some(f.as_ref()),
+            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
+            Some(d) => {
+                return Err(arrow_err!(
+                    "incompatible arrow schema, expected list got {}",
+                    d
+                ))
+            }
+            None => None,
+        };
+
+        if repeated_field.is_primitive() {
+            // If the repeated field is not a group, then its type is the element type and elements are required.
+            //
+            // required/optional group my_list (LIST) {
+            //   repeated int32 element;
+            // }
+            //
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_primitive(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    // visit_primitive will infer a non-nullable list, update if necessary
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        let items = repeated_field.get_fields();
+        if items.len() != 1
+            || repeated_field.name() == "array"
+            || repeated_field.name() == format!("{}_tuple", list_type.name())
+        {
+            // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
+            //
+            // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
+            // with _tuple appended then the repeated type is the element type and elements are required.
+            let context = VisitorContext {
+                rep_level: context.rep_level,
+                def_level,
+                data_type: arrow_field.map(|f| f.data_type().clone()),
+            };
+
+            return match self.visit_struct(repeated_field, context) {
+                Ok(Some(mut field)) => {
+                    field.nullable = nullable;
+                    Ok(Some(field))
+                }
+                r => r,
+            };
+        }
+
+        // Regular list handling logic
+        let item_type = &items[0];
+        let rep_level = context.rep_level + 1;
+        let def_level = def_level + 1;
+
+        let new_context = VisitorContext {
+            def_level,
+            rep_level,
+            data_type: arrow_field.map(|f| f.data_type().clone()),
+        };
+
+        match self.dispatch(item_type, new_context) {
+            Ok(Some(item)) => {
+                let item_field = Box::new(convert_field(item_type, &item, arrow_field));
+
+                // Use arrow type as hint for index size
+                let arrow_type = match context.data_type {
+                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
+                    Some(DataType::FixedSizeList(_, len)) => {
+                        DataType::FixedSizeList(item_field, len)
+                    }
+                    _ => DataType::List(item_field),
+                };
+
+                Ok(Some(ParquetField {
+                    rep_level,
+                    def_level,
+                    nullable,
+                    arrow_type,
+                    field_type: ParquetFieldType::Group {
+                        children: vec![item],
+                    },
+                }))
+            }
+            r => r,
+        }
+    }
+
+    fn dispatch(
+        &mut self,
+        cur_type: &TypePtr,
+        context: VisitorContext,
+    ) -> Result<Option<ParquetField>> {
+        if cur_type.is_primitive() {
+            self.visit_primitive(cur_type, context)
+        } else {
+            match cur_type.get_basic_info().converted_type() {
+                ConvertedType::LIST => self.visit_list(cur_type, context),
+                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
+                    self.visit_map(cur_type, context)
+                }
+                _ => self.visit_struct(cur_type, context),
+            }
+        }
+    }
+}
+
+/// Computes the [`Field`] for a child column
+///
+/// The resulting [`Field`] will have the type dictated by `field`, a name
+/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
+fn convert_field(
+    parquet_type: &Type,
+    field: &ParquetField,
+    arrow_hint: Option<&Field>,
+) -> Field {
+    let name = parquet_type.name();
+    let data_type = field.arrow_type.clone();
+    let nullable = field.nullable;
+
+    match arrow_hint {
+        Some(hint) => {
+            // If the inferred type is a dictionary, preserve dictionary metadata
+            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
+                (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
+                    Field::new_dict(name, data_type, nullable, id, ordered)
+                }
+                _ => Field::new(name, data_type, nullable),
+            };
+
+            field.with_metadata(hint.metadata().cloned())
+        }
+        None => Field::new(name, data_type, nullable),
+    }
+}
+
+/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
+/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
+/// [`Schema`] embedded in the parquet metadata
+///
+/// Note: This does not support out of order column projection
+pub fn convert_schema<T: IntoIterator<Item = usize>>(
+    schema: &SchemaDescriptor,
+    leaf_columns: T,
+    embedded_arrow_schema: Option<&Schema>,
+) -> Result<Option<ParquetField>> {
+    let mut leaf_mask = vec![false; schema.num_columns()];
+    let mut last_idx = 0;
+    for i in leaf_columns {
+        if i < last_idx {
+            return Err(general_err!("out of order projection is not supported"));
+        }
+        if leaf_mask[i] {
+            return Err(general_err!("repeated column projection is not supported, column {} appeared multiple times", i));
+        }
+
+        last_idx = i;
+        leaf_mask[i] = true;
+    }
+
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: leaf_mask,
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
+    };
+
+    visitor.dispatch(&schema.root_schema_ptr(), context)
+}
+
+/// Computes the [`ParquetField`] for the provided `parquet_type`
+pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
+    let mut visitor = Visitor {
+        next_col_idx: 0,
+        column_mask: vec![true],
+    };
+
+    let context = VisitorContext {
+        rep_level: 0,
+        def_level: 0,
+        data_type: None,
+    };
+
+    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
+}
diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs
new file mode 100644
index 000000000..0816b6b2f
--- /dev/null
+++ b/parquet/src/arrow/schema/primitive.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::{
+    ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{BasicTypeInfo, Type};
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+
+/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
+/// provided by the arrow schema
+///
+/// Note: the values embedded in the schema are advisory,
+pub fn convert_primitive(
+    parquet_type: &Type,
+    arrow_type_hint: Option<DataType>,
+) -> Result<DataType> {
+    let physical_type = from_parquet(parquet_type)?;
+    Ok(match arrow_type_hint {
+        Some(hint) => apply_hint(physical_type, hint),
+        None => physical_type,
+    })
+}
+
+/// Uses an type hint from the embedded arrow schema to aid in faithfully
+/// reproducing the data as it was written into parquet
+fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
+    match (&parquet, &hint) {
+        // Not all time units can be represented as LogicalType / ConvertedType
+        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
+        (DataType::Int32, DataType::Time32(_)) => hint,
+        (DataType::Int64, DataType::Time64(_)) => hint,
+
+        // Date64 doesn't have a corresponding LogicalType / ConvertedType
+        (DataType::Int64, DataType::Date64) => hint,
+
+        // Coerce Date32 back to Date64 (#1666)
+        (DataType::Date32, DataType::Date64) => hint,
+
+        // Determine timezone
+        (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint,
+
+        // Determine offset size
+        (DataType::Utf8, DataType::LargeUtf8) => hint,
+        (DataType::Binary, DataType::LargeBinary) => hint,
+
+        // Determine interval time unit (#1666)
+        (DataType::Interval(_), DataType::Interval(_)) => hint,
+
+        // Potentially preserve dictionary encoding
+        (_, DataType::Dictionary(_, value)) => {
+            // Apply hint to inner type
+            let hinted = apply_hint(parquet, value.as_ref().clone());
+
+            // If matches dictionary value - preserve dictionary
+            // otherwise use hinted inner type
+            match &hinted == value.as_ref() {
+                true => hint,
+                false => hinted,
+            }
+        }
+        _ => parquet,
+    }
+}
+
+fn from_parquet(parquet_type: &Type) -> Result<DataType> {
+    match parquet_type {
+        Type::PrimitiveType {
+            physical_type,
+            basic_info,
+            type_length,
+            scale,
+            precision,
+            ..
+        } => match physical_type {
+            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
+            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
+            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
+            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+            PhysicalType::FLOAT => Ok(DataType::Float32),
+            PhysicalType::DOUBLE => Ok(DataType::Float64),
+            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
+            }
+        },
+        Type::GroupType { .. } => unreachable!(),
+    }
+}
+
+fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+    let scale = scale
+        .try_into()
+        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+    let precision = precision
+        .try_into()
+        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
+
+    Ok(DataType::Decimal(precision, scale))
+}
+
+fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int32),
+        (
+            Some(
+                ref t @ LogicalType::Integer {
+                    bit_width,
+                    is_signed,
+                },
+            ),
+            _,
+        ) => match (bit_width, is_signed) {
+            (8, true) => Ok(DataType::Int8),
+            (16, true) => Ok(DataType::Int16),
+            (32, true) => Ok(DataType::Int32),
+            (8, false) => Ok(DataType::UInt8),
+            (16, false) => Ok(DataType::UInt16),
+            (32, false) => Ok(DataType::UInt32),
+            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
+        },
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+            _ => Err(arrow_err!(
+                "Cannot create INT32 physical type from {:?}",
+                unit
+            )),
+        },
+        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
+        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
+        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
+        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
+        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
+        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
+        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
+        (None, ConvertedType::DATE) => Ok(DataType::Date32),
+        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (None, ConvertedType::NONE) => Ok(DataType::Int64),
+        (
+            Some(LogicalType::Integer {
+                bit_width,
+                is_signed,
+            }),
+            _,
+        ) if bit_width == 64 => match is_signed {
+            true => Ok(DataType::Int64),
+            false => Ok(DataType::UInt64),
+        },
+        (Some(LogicalType::Time { unit, .. }), _) => match unit {
+            ParquetTimeUnit::MILLIS(_) => {
+                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
+            }
+            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
+        },
+        (
+            Some(LogicalType::Timestamp {
+                is_adjusted_to_u_t_c,
+                unit,
+            }),
+            _,
+        ) => Ok(DataType::Timestamp(
+            match unit {
+                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
+                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
+                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
+            },
+            if is_adjusted_to_u_t_c {
+                Some("UTC".to_string())
+            } else {
+                None
+            },
+        )),
+        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
+        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
+        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
+        (None, ConvertedType::TIMESTAMP_MILLIS) => {
+            Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+        }
+        (None, ConvertedType::TIMESTAMP_MICROS) => {
+            Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+        }
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
+        (Some(LogicalType::Json), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
+        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
+        (None, ConvertedType::NONE) => Ok(DataType::Binary),
+        (None, ConvertedType::JSON) => Ok(DataType::Binary),
+        (None, ConvertedType::BSON) => Ok(DataType::Binary),
+        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
+        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
+        (logical, converted) => Err(arrow_err!(
+            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
+            logical,
+            converted
+        )),
+    }
+}
+
+fn from_fixed_len_byte_array(
+    info: &BasicTypeInfo,
+    scale: i32,
+    precision: i32,
+    type_length: i32,
+) -> Result<DataType> {
+    // TODO: This should check the type length for the decimal and interval types
+    match (info.logical_type(), info.converted_type()) {
+        (Some(LogicalType::Decimal { scale, precision }), _) => {
+            decimal_type(scale, precision)
+        }
+        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::INTERVAL) => {
+            // There is currently no reliable way of determining which IntervalUnit
+            // to return. Thus without the original Arrow schema, the results
+            // would be incorrect if all 12 bytes of the interval are populated
+            Ok(DataType::Interval(IntervalUnit::DayTime))
+        }
+        _ => Ok(DataType::FixedSizeBinary(type_length)),
+    }
+}
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index be1a22192..fcbb846f1 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -135,6 +135,14 @@ macro_rules! eof_err {
     ($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*)));
 }
 
+macro_rules! arrow_err {
+    ($fmt:expr) => (ParquetError::ArrowError($fmt.to_owned()));
+    ($fmt:expr, $($args:expr),*) => (ParquetError::ArrowError(format!($fmt, $($args),*)));
+    ($e:expr, $fmt:expr) => (ParquetError::ArrowError($fmt.to_owned(), $e));
+    ($e:ident, $fmt:expr, $($args:tt),*) => (
+        ParquetError::ArrowError(&format!($fmt, $($args),*), $e));
+}
+
 // ----------------------------------------------------------------------
 // Convert parquet error into other errors