You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/25 09:48:05 UTC

[arrow-rs] branch master updated: Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e4a4553e Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)
8e4a4553e is described below

commit 8e4a4553e4330aa1a9d8dd7dbb7293165cc0cc40
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 25 10:47:59 2022 +0100

    Add FixedLengthByteArrayReader Remove ComplexObjectArrayReader (#2528)
    
    * Remove ComplexObjectArrayReader add FixedLengthByteArrayReader
    
    * Add Interval support
    
    * Add more test
    
    * Handle all null array
    
    * Review feedback
---
 parquet/src/arrow/array_reader/builder.rs          |  80 +--
 .../src/arrow/array_reader/complex_object_array.rs | 605 ---------------------
 .../src/arrow/array_reader/fixed_len_byte_array.rs | 475 ++++++++++++++++
 parquet/src/arrow/array_reader/mod.rs              |   8 +-
 parquet/src/arrow/arrow_reader/mod.rs              |  28 +-
 parquet/src/arrow/buffer/converter.rs              | 238 --------
 parquet/src/arrow/buffer/mod.rs                    |   1 -
 parquet/src/arrow/record_reader/mod.rs             |  12 +-
 parquet/src/data_type.rs                           |  12 +
 9 files changed, 533 insertions(+), 926 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index d944ff2dc..5f3ce7582 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -17,26 +17,20 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::{DataType, IntervalUnit, SchemaRef};
+use arrow::datatypes::{DataType, SchemaRef};
 
 use crate::arrow::array_reader::empty_array::make_empty_array_reader;
+use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
 use crate::arrow::array_reader::{
     make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
-    ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
-    PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
-};
-use crate::arrow::buffer::converter::{
-    DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
-    FixedLenBinaryConverter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
-    IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
-    IntervalYearMonthConverter,
+    ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader,
+    RowGroupCollection, StructArrayReader,
 };
 use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
 use crate::arrow::ProjectionMask;
 use crate::basic::Type as PhysicalType;
 use crate::data_type::{
-    BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
-    Int64Type, Int96Type,
+    BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type,
 };
 use crate::errors::Result;
 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
@@ -127,14 +121,12 @@ fn build_primitive_reader(
     field: &ParquetField,
     row_groups: &dyn RowGroupCollection,
 ) -> Result<Box<dyn ArrayReader>> {
-    let (col_idx, primitive_type, type_len) = match &field.field_type {
+    let (col_idx, primitive_type) = match &field.field_type {
         ParquetFieldType::Primitive {
             col_idx,
             primitive_type,
         } => match primitive_type.as_ref() {
-            Type::PrimitiveType { type_length, .. } => {
-                (*col_idx, primitive_type.clone(), *type_length)
-            }
+            Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
             Type::GroupType { .. } => unreachable!(),
         },
         _ => unreachable!(),
@@ -204,61 +196,9 @@ fn build_primitive_reader(
             }
             _ => make_byte_array_reader(page_iterator, column_desc, arrow_type),
         },
-        PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
-            DataType::Decimal128(precision, scale) => {
-                let converter = DecimalFixedLengthByteArrayConverter::new(
-                    DecimalArrayConverter::new(precision, scale),
-                );
-                Ok(Box::new(ComplexObjectArrayReader::<
-                    FixedLenByteArrayType,
-                    DecimalFixedLengthByteArrayConverter,
-                >::new(
-                    page_iterator,
-                    column_desc,
-                    converter,
-                    arrow_type,
-                )?))
-            }
-            DataType::Interval(IntervalUnit::DayTime) => {
-                let converter =
-                    IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {});
-                Ok(Box::new(ComplexObjectArrayReader::<
-                    FixedLenByteArrayType,
-                    _,
-                >::new(
-                    page_iterator,
-                    column_desc,
-                    converter,
-                    arrow_type,
-                )?))
-            }
-            DataType::Interval(IntervalUnit::YearMonth) => {
-                let converter =
-                    IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {});
-                Ok(Box::new(ComplexObjectArrayReader::<
-                    FixedLenByteArrayType,
-                    _,
-                >::new(
-                    page_iterator,
-                    column_desc,
-                    converter,
-                    arrow_type,
-                )?))
-            }
-            _ => {
-                let converter =
-                    FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len));
-                Ok(Box::new(ComplexObjectArrayReader::<
-                    FixedLenByteArrayType,
-                    FixedLenBinaryConverter,
-                >::new(
-                    page_iterator,
-                    column_desc,
-                    converter,
-                    arrow_type,
-                )?))
-            }
-        },
+        PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+            make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)
+        }
     }
 }
 
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs
deleted file mode 100644
index 4f958fea4..000000000
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ /dev/null
@@ -1,605 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::arrow::array_reader::ArrayReader;
-use crate::arrow::buffer::converter::Converter;
-use crate::arrow::schema::parquet_to_arrow_field;
-use crate::column::page::PageIterator;
-use crate::column::reader::ColumnReaderImpl;
-use crate::data_type::DataType;
-use crate::errors::Result;
-use crate::schema::types::ColumnDescPtr;
-use arrow::array::ArrayRef;
-use arrow::datatypes::DataType as ArrowType;
-use std::any::Any;
-use std::marker::PhantomData;
-
-/// Primitive array readers are leaves of array reader tree. They accept page iterator
-/// and read them into primitive arrays.
-pub struct ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
-    data_type: ArrowType,
-    pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Vec<i16>>,
-    rep_levels_buffer: Option<Vec<i16>>,
-    data_buffer: Vec<T::T>,
-    column_desc: ColumnDescPtr,
-    column_reader: Option<ColumnReaderImpl<T>>,
-    converter: C,
-    in_progress_def_levels_buffer: Option<Vec<i16>>,
-    in_progress_rep_levels_buffer: Option<Vec<i16>>,
-    before_consume: bool,
-    _parquet_type_marker: PhantomData<T>,
-    _converter_marker: PhantomData<C>,
-}
-
-impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
-{
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
-        if !self.before_consume {
-            self.before_consume = true;
-        }
-        // Try to initialize column reader
-        if self.column_reader.is_none() {
-            self.next_column_reader()?;
-        }
-
-        let mut data_buffer: Vec<T::T> = Vec::with_capacity(batch_size);
-        data_buffer.resize_with(batch_size, T::T::default);
-
-        let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 {
-            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
-            buf.resize_with(batch_size, || 0);
-            Some(buf)
-        } else {
-            None
-        };
-
-        let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 {
-            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
-            buf.resize_with(batch_size, || 0);
-            Some(buf)
-        } else {
-            None
-        };
-
-        let mut num_read = 0;
-
-        while self.column_reader.is_some() && num_read < batch_size {
-            let num_to_read = batch_size - num_read;
-            let cur_data_buf = &mut data_buffer[num_read..];
-            let cur_def_levels_buf =
-                def_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
-            let cur_rep_levels_buf =
-                rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
-            let (data_read, levels_read) =
-                self.column_reader.as_mut().unwrap().read_batch(
-                    num_to_read,
-                    cur_def_levels_buf,
-                    cur_rep_levels_buf,
-                    cur_data_buf,
-                )?;
-
-            // Fill space
-            if levels_read > data_read {
-                def_levels_buffer.iter().for_each(|def_levels_buffer| {
-                    let (mut level_pos, mut data_pos) = (levels_read, data_read);
-                    while level_pos > 0 && data_pos > 0 {
-                        if def_levels_buffer[num_read + level_pos - 1]
-                            == self.column_desc.max_def_level()
-                        {
-                            cur_data_buf.swap(level_pos - 1, data_pos - 1);
-                            level_pos -= 1;
-                            data_pos -= 1;
-                        } else {
-                            level_pos -= 1;
-                        }
-                    }
-                });
-            }
-
-            let values_read = levels_read.max(data_read);
-            num_read += values_read;
-            // current page exhausted && page iterator exhausted
-            if values_read < num_to_read && !self.next_column_reader()? {
-                break;
-            }
-        }
-        data_buffer.truncate(num_read);
-        def_levels_buffer
-            .iter_mut()
-            .for_each(|buf| buf.truncate(num_read));
-        rep_levels_buffer
-            .iter_mut()
-            .for_each(|buf| buf.truncate(num_read));
-
-        if let Some(mut def_levels_buffer) = def_levels_buffer {
-            match &mut self.in_progress_def_levels_buffer {
-                None => {
-                    self.in_progress_def_levels_buffer = Some(def_levels_buffer);
-                }
-                Some(buf) => buf.append(&mut def_levels_buffer),
-            }
-        }
-
-        if let Some(mut rep_levels_buffer) = rep_levels_buffer {
-            match &mut self.in_progress_rep_levels_buffer {
-                None => {
-                    self.in_progress_rep_levels_buffer = Some(rep_levels_buffer);
-                }
-                Some(buf) => buf.append(&mut rep_levels_buffer),
-            }
-        }
-
-        self.data_buffer.append(&mut data_buffer);
-
-        Ok(num_read)
-    }
-
-    fn consume_batch(&mut self) -> Result<ArrayRef> {
-        let data: Vec<Option<T::T>> = if self.in_progress_def_levels_buffer.is_some() {
-            let data_buffer = std::mem::take(&mut self.data_buffer);
-            data_buffer
-                .into_iter()
-                .zip(self.in_progress_def_levels_buffer.as_ref().unwrap().iter())
-                .map(|(t, def_level)| {
-                    if *def_level == self.column_desc.max_def_level() {
-                        Some(t)
-                    } else {
-                        None
-                    }
-                })
-                .collect()
-        } else {
-            self.data_buffer.iter().map(|x| Some(x.clone())).collect()
-        };
-
-        let mut array = self.converter.convert(data)?;
-
-        if let ArrowType::Dictionary(_, _) = self.data_type {
-            array = arrow::compute::cast(&array, &self.data_type)?;
-        }
-
-        self.data_buffer = vec![];
-        self.def_levels_buffer = std::mem::take(&mut self.in_progress_def_levels_buffer);
-        self.rep_levels_buffer = std::mem::take(&mut self.in_progress_rep_levels_buffer);
-        self.before_consume = false;
-
-        Ok(array)
-    }
-
-    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        let mut num_read = 0;
-        while (self.column_reader.is_some() || self.next_column_reader()?)
-            && num_read < num_records
-        {
-            let remain_to_skip = num_records - num_read;
-            let skip = self
-                .column_reader
-                .as_mut()
-                .unwrap()
-                .skip_records(remain_to_skip)?;
-            num_read += skip;
-            //  skip < remain_to_skip means end of row group
-            //  self.next_column_reader() == false means end of file
-            if skip < remain_to_skip && !self.next_column_reader()? {
-                break;
-            }
-        }
-        Ok(num_read)
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        if self.before_consume {
-            self.in_progress_def_levels_buffer.as_deref()
-        } else {
-            self.def_levels_buffer.as_deref()
-        }
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        if self.before_consume {
-            self.in_progress_rep_levels_buffer.as_deref()
-        } else {
-            self.rep_levels_buffer.as_deref()
-        }
-    }
-}
-
-impl<T, C> ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
-    pub fn new(
-        pages: Box<dyn PageIterator>,
-        column_desc: ColumnDescPtr,
-        converter: C,
-        arrow_type: Option<ArrowType>,
-    ) -> Result<Self> {
-        let data_type = match arrow_type {
-            Some(t) => t,
-            None => parquet_to_arrow_field(column_desc.as_ref())?
-                .data_type()
-                .clone(),
-        };
-
-        Ok(Self {
-            data_type,
-            pages,
-            def_levels_buffer: None,
-            rep_levels_buffer: None,
-            data_buffer: vec![],
-            column_desc,
-            column_reader: None,
-            converter,
-            in_progress_def_levels_buffer: None,
-            in_progress_rep_levels_buffer: None,
-            before_consume: true,
-            _parquet_type_marker: PhantomData,
-            _converter_marker: PhantomData,
-        })
-    }
-
-    fn next_column_reader(&mut self) -> Result<bool> {
-        Ok(match self.pages.next() {
-            Some(page) => {
-                self.column_reader =
-                    Some(ColumnReaderImpl::<T>::new(self.column_desc.clone(), page?));
-                true
-            }
-            None => false,
-        })
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
-    use crate::basic::Encoding;
-    use crate::column::page::Page;
-    use crate::data_type::{ByteArray, ByteArrayType};
-    use crate::schema::parser::parse_message_type;
-    use crate::schema::types::SchemaDescriptor;
-    use crate::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
-    use arrow::array::StringArray;
-    use rand::{thread_rng, Rng};
-    use std::sync::Arc;
-
-    #[test]
-    fn test_complex_array_reader_no_pages() {
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let pages: Vec<Vec<Page>> = Vec::new();
-        let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
-
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), 0);
-    }
-
-    #[test]
-    fn test_complex_array_reader_def_and_rep_levels() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let max_def_level = schema.column(0).max_def_level();
-        let max_rep_level = schema.column(0).max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let column_desc = schema.column(0);
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
-
-            pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
-
-            let data_page = pb.consume();
-            pages.push(vec![data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
-
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        let len = array_reader.read_records(values_per_page / 2).unwrap();
-        assert_eq!(len, values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += len;
-        array_reader.consume_batch().unwrap();
-
-        // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        let len = array_reader.read_records(values_per_page).unwrap();
-        assert_eq!(len, values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        let array = array_reader.consume_batch().unwrap();
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += len;
-
-        // Try to read values_per_page values, however there are only values_per_page/2 values
-        let len = array_reader.read_records(values_per_page).unwrap();
-        assert_eq!(len, values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        array_reader.consume_batch().unwrap();
-    }
-
-    #[test]
-    fn test_complex_array_reader_dict_enc_string() {
-        use crate::encodings::encoding::{DictEncoder, Encoder};
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let max_def_level = column_desc.max_def_level();
-        let max_rep_level = column_desc.max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
-            // add data page
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
-            pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            let _ = dict_encoder.put(&values);
-            let indices = dict_encoder
-                .write_indices()
-                .expect("write_indices() should be OK");
-            pb.add_indices(indices);
-            let data_page = pb.consume();
-            // for each page log num_values vs actual values in page
-            // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len());
-            // add dictionary page
-            let dict = dict_encoder
-                .write_dict()
-                .expect("write_dict() should be OK");
-            let dict_page = Page::DictionaryPage {
-                buf: dict,
-                num_values: dict_encoder.num_entries() as u32,
-                encoding: Encoding::RLE_DICTIONARY,
-                is_sorted: false,
-            };
-            pages.push(vec![dict_page, data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        // println!("---------- reading a batch of {} values ----------", values_per_page / 2);
-        let len = array_reader.read_records(values_per_page / 2).unwrap();
-        assert_eq!(len, values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += len;
-        array_reader.consume_batch().unwrap();
-
-        // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        // println!("---------- reading a batch of {} values ----------", values_per_page);
-        //let array = array_reader.next_batch(values_per_page).unwrap();
-        let len = array_reader.read_records(values_per_page).unwrap();
-        assert_eq!(len, values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        let array = array_reader.consume_batch().unwrap();
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += len;
-
-        // Try to read values_per_page values, however there are only values_per_page/2 values
-        // println!("---------- reading a batch of {} values ----------", values_per_page);
-        let len = array_reader.read_records(values_per_page).unwrap();
-        assert_eq!(len, values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + len)]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + len)]),
-            array_reader.get_rep_levels()
-        );
-        array_reader.consume_batch().unwrap();
-    }
-}
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
new file mode 100644
index 000000000..ba3a02c4f
--- /dev/null
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
+use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ValuesBuffer};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{Encoding, Type};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
+use arrow::array::{
+    ArrayDataBuilder, ArrayRef, Decimal128Array, FixedSizeBinaryArray,
+    IntervalDayTimeArray, IntervalYearMonthArray,
+};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{DataType as ArrowType, IntervalUnit};
+use std::any::Any;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
+pub fn make_fixed_len_byte_array_reader(
+    pages: Box<dyn PageIterator>,
+    column_desc: ColumnDescPtr,
+    arrow_type: Option<ArrowType>,
+) -> Result<Box<dyn ArrayReader>> {
+    // Check if Arrow type is specified, else create it from Parquet type
+    let data_type = match arrow_type {
+        Some(t) => t,
+        None => parquet_to_arrow_field(column_desc.as_ref())?
+            .data_type()
+            .clone(),
+    };
+
+    let byte_length = match column_desc.physical_type() {
+        Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
+        t => {
+            return Err(general_err!(
+                "invalid physical type for fixed length byte array reader - {}",
+                t
+            ))
+        }
+    };
+
+    match &data_type {
+        ArrowType::FixedSizeBinary(_) => {}
+        ArrowType::Decimal128(_, _) => {
+            if byte_length > 16 {
+                return Err(general_err!(
+                    "decimal 128 type too large, must be less than 16 bytes, got {}",
+                    byte_length
+                ));
+            }
+        }
+        ArrowType::Interval(_) => {
+            if byte_length != 12 {
+                // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
+                return Err(general_err!(
+                    "interval type must consist of 12 bytes got {}",
+                    byte_length
+                ));
+            }
+        }
+        _ => {
+            return Err(general_err!(
+                "invalid data type for fixed length byte array reader - {}",
+                data_type
+            ))
+        }
+    }
+
+    Ok(Box::new(FixedLenByteArrayReader::new(
+        pages,
+        column_desc,
+        data_type,
+        byte_length,
+    )))
+}
+
+struct FixedLenByteArrayReader {
+    data_type: ArrowType,
+    byte_length: usize,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
+}
+
+impl FixedLenByteArrayReader {
+    fn new(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        data_type: ArrowType,
+        byte_length: usize,
+    ) -> Self {
+        Self {
+            data_type,
+            byte_length,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            record_reader: GenericRecordReader::new_with_records(
+                column_desc,
+                FixedLenByteArrayBuffer {
+                    buffer: Default::default(),
+                    byte_length,
+                },
+            ),
+        }
+    }
+}
+
+impl ArrayReader for FixedLenByteArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let record_data = self.record_reader.consume_record_data();
+
+        let array_data =
+            ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
+                .len(self.record_reader.num_values())
+                .add_buffer(record_data)
+                .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
+
+        let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
+
+        // TODO: An improvement might be to do this conversion on read
+        let array = match &self.data_type {
+            ArrowType::Decimal128(p, s) => {
+                let decimal = binary
+                    .iter()
+                    .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*p, *s)?;
+
+                Arc::new(decimal)
+            }
+            ArrowType::Interval(unit) => {
+                // An interval is stored as 3x 32-bit unsigned integers storing months, days,
+                // and milliseconds
+                match unit {
+                    IntervalUnit::YearMonth => Arc::new(
+                        binary
+                            .iter()
+                            .map(|o| {
+                                o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))
+                            })
+                            .collect::<IntervalYearMonthArray>(),
+                    ) as ArrayRef,
+                    IntervalUnit::DayTime => Arc::new(
+                        binary
+                            .iter()
+                            .map(|o| {
+                                o.map(|b| {
+                                    i64::from_le_bytes(b[4..12].try_into().unwrap())
+                                })
+                            })
+                            .collect::<IntervalDayTimeArray>(),
+                    ) as ArrayRef,
+                    IntervalUnit::MonthDayNano => {
+                        return Err(nyi_err!("MonthDayNano intervals not supported"));
+                    }
+                }
+            }
+            _ => Arc::new(binary) as ArrayRef,
+        };
+
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
+        self.record_reader.reset();
+
+        Ok(array)
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+}
+
+struct FixedLenByteArrayBuffer {
+    buffer: ScalarBuffer<u8>,
+    /// The length of each element in bytes
+    byte_length: usize,
+}
+
+impl ValuesBufferSlice for FixedLenByteArrayBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl BufferQueue for FixedLenByteArrayBuffer {
+    type Output = Buffer;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        self.buffer.split_off(len * self.byte_length)
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        assert_eq!(self.buffer.len(), len * self.byte_length);
+    }
+}
+
+impl ValuesBuffer for FixedLenByteArrayBuffer {
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        assert_eq!(
+            self.buffer.len(),
+            (read_offset + values_read) * self.byte_length
+        );
+        self.buffer
+            .resize((read_offset + levels_read) * self.byte_length);
+
+        let slice = self.buffer.as_slice_mut();
+
+        let values_range = read_offset..read_offset + values_read;
+        for (value_pos, level_pos) in
+            values_range.rev().zip(iter_set_bits_rev(valid_mask))
+        {
+            debug_assert!(level_pos >= value_pos);
+            if level_pos <= value_pos {
+                break;
+            }
+
+            let level_pos_bytes = level_pos * self.byte_length;
+            let value_pos_bytes = value_pos * self.byte_length;
+
+            for i in 0..self.byte_length {
+                slice[level_pos_bytes + i] = slice[value_pos_bytes + i]
+            }
+        }
+    }
+}
+
+struct ValueDecoder {
+    byte_length: usize,
+    dict_page: Option<ByteBufferPtr>,
+    decoder: Option<Decoder>,
+}
+
+impl ColumnValueDecoder for ValueDecoder {
+    type Slice = FixedLenByteArrayBuffer;
+
+    fn new(col: &ColumnDescPtr) -> Self {
+        Self {
+            byte_length: col.type_length() as usize,
+            dict_page: None,
+            decoder: None,
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if !matches!(
+            encoding,
+            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
+        ) {
+            return Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ));
+        }
+        let expected_len = num_values as usize * self.byte_length;
+        if expected_len > buf.len() {
+            return Err(general_err!(
+                "too few bytes in dictionary page, expected {} got {}",
+                expected_len,
+                buf.len()
+            ));
+        }
+
+        self.dict_page = Some(buf);
+        Ok(())
+    }
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+    ) -> Result<()> {
+        self.decoder = Some(match encoding {
+            Encoding::PLAIN => Decoder::Plain {
+                buf: data,
+                offset: 0,
+            },
+            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
+                decoder: DictIndexDecoder::new(data, num_levels, num_values),
+            },
+            Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
+                decoder: DeltaByteArrayDecoder::new(data)?,
+            },
+            _ => {
+                return Err(general_err!(
+                    "unsupported encoding for fixed length byte array: {}",
+                    encoding
+                ))
+            }
+        });
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
+        assert_eq!(self.byte_length, out.byte_length);
+
+        let len = range.end - range.start;
+        match self.decoder.as_mut().unwrap() {
+            Decoder::Plain { offset, buf } => {
+                let to_read =
+                    (len * self.byte_length).min(buf.len() - *offset) / self.byte_length;
+                let end_offset = *offset + to_read * self.byte_length;
+                out.buffer
+                    .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
+                *offset = end_offset;
+                Ok(to_read)
+            }
+            Decoder::Dict { decoder } => {
+                let dict = self.dict_page.as_ref().unwrap();
+                // All data must be NULL
+                if dict.is_empty() {
+                    return Ok(0);
+                }
+
+                decoder.read(len, |keys| {
+                    out.buffer.reserve(keys.len() * self.byte_length);
+                    for key in keys {
+                        let offset = *key as usize * self.byte_length;
+                        let val = &dict.as_ref()[offset..offset + self.byte_length];
+                        out.buffer.extend_from_slice(val);
+                    }
+                    Ok(())
+                })
+            }
+            Decoder::Delta { decoder } => {
+                let to_read = len.min(decoder.remaining());
+                out.buffer.reserve(to_read * self.byte_length);
+
+                decoder.read(to_read, |slice| {
+                    if slice.len() != self.byte_length {
+                        return Err(general_err!(
+                            "encountered array with incorrect length, got {} expected {}",
+                            slice.len(),
+                            self.byte_length
+                        ));
+                    }
+                    out.buffer.extend_from_slice(slice);
+                    Ok(())
+                })
+            }
+        }
+    }
+
+    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+        match self.decoder.as_mut().unwrap() {
+            Decoder::Plain { offset, buf } => {
+                let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
+                *offset += to_read * self.byte_length;
+                Ok(to_read)
+            }
+            Decoder::Dict { decoder } => decoder.skip(num_values),
+            Decoder::Delta { decoder } => decoder.skip(num_values),
+        }
+    }
+}
+
+enum Decoder {
+    Plain { buf: ByteBufferPtr, offset: usize },
+    Dict { decoder: DictIndexDecoder },
+    Delta { decoder: DeltaByteArrayDecoder },
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+    use crate::arrow::ArrowWriter;
+    use arrow::array::{Array, Decimal128Array, ListArray};
+    use arrow::datatypes::Field;
+    use arrow::error::Result as ArrowResult;
+    use arrow::record_batch::RecordBatch;
+    use bytes::Bytes;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_decimal_list() {
+        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
+
+        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
+        let data = ArrayDataBuilder::new(ArrowType::List(Box::new(Field::new(
+            "item",
+            decimals.data_type().clone(),
+            false,
+        ))))
+        .len(7)
+        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
+        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
+        .child_data(vec![decimals.into_data()])
+        .build()
+        .unwrap();
+
+        let written = RecordBatch::try_from_iter([(
+            "list",
+            Arc::new(ListArray::from(data)) as ArrayRef,
+        )])
+        .unwrap();
+
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer =
+            ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
+        writer.write(&written).unwrap();
+        writer.close().unwrap();
+
+        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(&written.slice(0, 3), &read[0]);
+        assert_eq!(&written.slice(3, 3), &read[1]);
+        assert_eq!(&written.slice(6, 1), &read[2]);
+    }
+}
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index 480b4d4df..3740f0fae 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -33,8 +33,8 @@ use crate::schema::types::SchemaDescPtr;
 mod builder;
 mod byte_array;
 mod byte_array_dictionary;
-mod complex_object_array;
 mod empty_array;
+mod fixed_len_byte_array;
 mod list_array;
 mod map_array;
 mod null_array;
@@ -47,7 +47,7 @@ mod test_util;
 pub use builder::build_array_reader;
 pub use byte_array::make_byte_array_reader;
 pub use byte_array_dictionary::make_byte_array_dictionary_reader;
-pub use complex_object_array::ComplexObjectArrayReader;
+pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
 pub use list_array::ListArrayReader;
 pub use map_array::MapArrayReader;
 pub use null_array::NullArrayReader;
@@ -181,7 +181,7 @@ fn read_records<V, CV>(
     batch_size: usize,
 ) -> Result<usize>
 where
-    V: ValuesBuffer + Default,
+    V: ValuesBuffer,
     CV: ColumnValueDecoder<Slice = V::Slice>,
 {
     let mut records_read = 0usize;
@@ -215,7 +215,7 @@ fn skip_records<V, CV>(
     batch_size: usize,
 ) -> Result<usize>
 where
-    V: ValuesBuffer + Default,
+    V: ValuesBuffer,
     CV: ColumnValueDecoder<Slice = V::Slice>,
 {
     let mut records_skipped = 0usize;
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index c1ee7a474..76e247ae1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -627,9 +627,6 @@ mod tests {
         ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
         ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
     };
-    use crate::arrow::buffer::converter::{
-        Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
-    };
     use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
@@ -836,24 +833,41 @@ mod tests {
 
     #[test]
     fn test_fixed_length_binary_column_reader() {
-        let converter = FixedSizeArrayConverter::new(20);
         run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
             20,
             ConvertedType::NONE,
             None,
-            |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
+            |vals| {
+                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
+                for val in vals {
+                    match val {
+                        Some(b) => builder.append_value(b).unwrap(),
+                        None => builder.append_null(),
+                    }
+                }
+                Arc::new(builder.finish())
+            },
             &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
         );
     }
 
     #[test]
     fn test_interval_day_time_column_reader() {
-        let converter = IntervalDayTimeArrayConverter {};
         run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
             12,
             ConvertedType::INTERVAL,
             None,
-            |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
+            |vals| {
+                Arc::new(
+                    vals.iter()
+                        .map(|x| {
+                            x.as_ref().map(|b| {
+                                i64::from_le_bytes(b.as_ref()[4..12].try_into().unwrap())
+                            })
+                        })
+                        .collect::<IntervalDayTimeArray>(),
+                )
+            },
             &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
         );
     }
diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs
deleted file mode 100644
index 3db0be4e9..000000000
--- a/parquet/src/arrow/buffer/converter.rs
+++ /dev/null
@@ -1,238 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::data_type::FixedLenByteArray;
-use arrow::array::{
-    Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
-    IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
-    IntervalYearMonthBuilder,
-};
-use std::sync::Arc;
-
-use crate::errors::Result;
-use std::marker::PhantomData;
-
-use crate::arrow::buffer::bit_util::sign_extend_be;
-#[cfg(test)]
-use crate::data_type::ByteArray;
-
-#[cfg(test)]
-use arrow::array::{StringArray, StringBuilder};
-
-/// A converter is used to consume record reader's content and convert it to arrow
-/// primitive array.
-pub trait Converter<S, T> {
-    /// This method converts record reader's buffered content into arrow array.
-    /// It will consume record reader's data, but will not reset record reader's
-    /// state.
-    fn convert(&self, source: S) -> Result<T>;
-}
-
-pub struct FixedSizeArrayConverter {
-    byte_width: i32,
-}
-
-impl FixedSizeArrayConverter {
-    pub fn new(byte_width: i32) -> Self {
-        Self { byte_width }
-    }
-}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, FixedSizeBinaryArray>
-    for FixedSizeArrayConverter
-{
-    fn convert(
-        &self,
-        source: Vec<Option<FixedLenByteArray>>,
-    ) -> Result<FixedSizeBinaryArray> {
-        let mut builder =
-            FixedSizeBinaryBuilder::with_capacity(source.len(), self.byte_width);
-        for v in source {
-            match v {
-                Some(array) => builder.append_value(array.data())?,
-                None => builder.append_null(),
-            }
-        }
-
-        Ok(builder.finish())
-    }
-}
-
-pub struct DecimalArrayConverter {
-    precision: u8,
-    scale: u8,
-}
-
-impl DecimalArrayConverter {
-    pub fn new(precision: u8, scale: u8) -> Self {
-        Self { precision, scale }
-    }
-}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
-    for DecimalArrayConverter
-{
-    fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<Decimal128Array> {
-        let array = source
-            .into_iter()
-            .map(|array| array.map(|array| from_bytes_to_i128(array.data())))
-            .collect::<Decimal128Array>()
-            .with_precision_and_scale(self.precision, self.scale)?;
-
-        Ok(array)
-    }
-}
-
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
-    // The bytes array are from parquet file and must be the big-endian.
-    // The endian is defined by parquet format, and the reference document
-    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
-    i128::from_be_bytes(sign_extend_be(b))
-}
-
-/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
-/// and interprets it as an i32 value representing the Arrow YearMonth value
-pub struct IntervalYearMonthArrayConverter {}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, IntervalYearMonthArray>
-    for IntervalYearMonthArrayConverter
-{
-    fn convert(
-        &self,
-        source: Vec<Option<FixedLenByteArray>>,
-    ) -> Result<IntervalYearMonthArray> {
-        let mut builder = IntervalYearMonthBuilder::with_capacity(source.len());
-        for v in source {
-            match v {
-                Some(array) => builder.append_value(i32::from_le_bytes(
-                    array.data()[0..4].try_into().unwrap(),
-                )),
-                None => builder.append_null(),
-            }
-        }
-
-        Ok(builder.finish())
-    }
-}
-
-/// An Arrow Interval converter, which reads the last 8 bytes of a Parquet interval,
-/// and interprets it as an i32 value representing the Arrow DayTime value
-pub struct IntervalDayTimeArrayConverter {}
-
-impl Converter<Vec<Option<FixedLenByteArray>>, IntervalDayTimeArray>
-    for IntervalDayTimeArrayConverter
-{
-    fn convert(
-        &self,
-        source: Vec<Option<FixedLenByteArray>>,
-    ) -> Result<IntervalDayTimeArray> {
-        let mut builder = IntervalDayTimeBuilder::with_capacity(source.len());
-        for v in source {
-            match v {
-                Some(array) => builder.append_value(i64::from_le_bytes(
-                    array.data()[4..12].try_into().unwrap(),
-                )),
-                None => builder.append_null(),
-            }
-        }
-
-        Ok(builder.finish())
-    }
-}
-
-#[cfg(test)]
-pub struct Utf8ArrayConverter {}
-
-#[cfg(test)]
-impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
-    fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<StringArray> {
-        let data_size = source
-            .iter()
-            .map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
-            .sum();
-
-        let mut builder = StringBuilder::with_capacity(source.len(), data_size);
-        for v in source {
-            match v {
-                Some(array) => builder.append_value(array.as_utf8()?),
-                None => builder.append_null(),
-            }
-        }
-
-        Ok(builder.finish())
-    }
-}
-
-#[cfg(test)]
-pub type Utf8Converter =
-    ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
-
-pub type FixedLenBinaryConverter = ArrayRefConverter<
-    Vec<Option<FixedLenByteArray>>,
-    FixedSizeBinaryArray,
-    FixedSizeArrayConverter,
->;
-pub type IntervalYearMonthConverter = ArrayRefConverter<
-    Vec<Option<FixedLenByteArray>>,
-    IntervalYearMonthArray,
-    IntervalYearMonthArrayConverter,
->;
-pub type IntervalDayTimeConverter = ArrayRefConverter<
-    Vec<Option<FixedLenByteArray>>,
-    IntervalDayTimeArray,
-    IntervalDayTimeArrayConverter,
->;
-
-pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
-    Vec<Option<FixedLenByteArray>>,
-    Decimal128Array,
-    DecimalArrayConverter,
->;
-
-pub struct ArrayRefConverter<S, A, C> {
-    _source: PhantomData<S>,
-    _array: PhantomData<A>,
-    converter: C,
-}
-
-impl<S, A, C> ArrayRefConverter<S, A, C>
-where
-    A: Array + 'static,
-    C: Converter<S, A> + 'static,
-{
-    pub fn new(converter: C) -> Self {
-        Self {
-            _source: PhantomData,
-            _array: PhantomData,
-            converter,
-        }
-    }
-}
-
-impl<S, A, C> Converter<S, ArrayRef> for ArrayRefConverter<S, A, C>
-where
-    A: Array + 'static,
-    C: Converter<S, A> + 'static,
-{
-    fn convert(&self, source: S) -> Result<ArrayRef> {
-        self.converter
-            .convert(source)
-            .map(|array| Arc::new(array) as ArrayRef)
-    }
-}
diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs
index 5ee89aa1a..cbc795d94 100644
--- a/parquet/src/arrow/buffer/mod.rs
+++ b/parquet/src/arrow/buffer/mod.rs
@@ -18,6 +18,5 @@
 //! Logic for reading data into arrow buffers
 
 pub mod bit_util;
-pub mod converter;
 pub mod dictionary_buffer;
 pub mod offset_buffer;
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index 18b4c9e07..6c1c61039 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -78,13 +78,23 @@ where
 {
     /// Create a new [`GenericRecordReader`]
     pub fn new(desc: ColumnDescPtr) -> Self {
+        Self::new_with_records(desc, V::default())
+    }
+}
+
+impl<V, CV> GenericRecordReader<V, CV>
+where
+    V: ValuesBuffer,
+    CV: ColumnValueDecoder<Slice = V::Slice>,
+{
+    pub fn new_with_records(desc: ColumnDescPtr, records: V) -> Self {
         let def_levels = (desc.max_def_level() > 0)
             .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
 
         let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
 
         Self {
-            records: Default::default(),
+            records,
             def_levels,
             rep_levels,
             column_reader: None,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 35fec60a0..440160039 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -1208,6 +1208,18 @@ make_type!(
     mem::size_of::<FixedLenByteArray>()
 );
 
+impl AsRef<[u8]> for ByteArray {
+    fn as_ref(&self) -> &[u8] {
+        self.as_bytes()
+    }
+}
+
+impl AsRef<[u8]> for FixedLenByteArray {
+    fn as_ref(&self) -> &[u8] {
+        self.as_bytes()
+    }
+}
+
 impl FromBytes for Int96 {
     type Buffer = [u8; 12];
     fn from_le_bytes(bs: Self::Buffer) -> Self {