You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/24 17:17:29 UTC

[arrow-rs] branch master updated: Parquet Reader/writer for fixed-size list arrays (#4267)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 741244da7 Parquet Reader/writer for fixed-size list arrays  (#4267)
741244da7 is described below

commit 741244da7846fdcb0b34d24ea90e77025863a88c
Author: Dexter Duckworth <de...@users.noreply.github.com>
AuthorDate: Wed May 24 13:17:22 2023 -0400

    Parquet Reader/writer for fixed-size list arrays  (#4267)
    
    * Initial implementation for writing fixed-size lists to Parquet.
    
    The implementation still needs tests.
    The implementation uses a new `write_fixed_size_list` method instead of `write_list`.
    This is done to avoid the overhead of needlessly calculating list offsets.
    
    * Initial implementation for reading fixed-size lists from Parquet.
    
    The implementation still needs tests.
    
    * Added tests for fixed-size list writer.
    
    Fixed bugs in implementation found via tests.
    
    * Added tests for fixed-size list reader.
    
    Fixed bugs in implementation found via tests.
    
    * Added correct behavior for writing empty fixed-length lists.
    
    Writer now emits the correct definition levels for empty lists.
    Added empty list unit test.
    
    * Added correct behavior for reading empty fixed-length lists.
    
    Reader now handles empty list definition levels correctly.
    Added empty list unit test.
    
    * Fixed linter warnings.
    
    * Added license header to fixed_size_list_array.rs
    
    * Added fixed-size list reader tests from PR review.
    
    * Added fixed-size reader row length sanity checks.
    
    * Simplified fixed-size list case in LevelInfoBuilder constructor.
    
    * Removed dynamic dispatch inside fixed-length list writer.
    
    * Expanded list of structs test for fixed-size list writer.
    
    * Reverted expected levels in fixed-size list writer test.
    
    * Fixed linter warnings.
    
    * Updated list size check in fixed-size list reader.
    
    Converted the check to return an error instead of panicking.
    
    * Small tweak to row length check in fixed-size list reader.
    
    * Fixed bug in fixed-size list level encoding.
    
    Writer now correctly handles child arrays with variable row length.
    Added new unit test to verify the new behavior is correct.
    
    * Added fixed-size list reader test.
    
    Test verifies that reader handles child arrays with variable length correctly.
---
 parquet/src/arrow/array_reader/builder.rs          |  44 +-
 .../arrow/array_reader/fixed_size_list_array.rs    | 688 +++++++++++++++++++++
 parquet/src/arrow/array_reader/mod.rs              |   2 +
 parquet/src/arrow/arrow_writer/levels.rs           | 368 ++++++++++-
 parquet/src/arrow/arrow_writer/mod.rs              |  16 +-
 5 files changed, 1112 insertions(+), 6 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 241a5efe0..5e0d05e89 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -23,8 +23,8 @@ use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
 use crate::arrow::array_reader::{
     make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
-    ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader,
-    RowGroupCollection, StructArrayReader,
+    FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
+    PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
 };
 use crate::arrow::schema::{ParquetField, ParquetFieldType};
 use crate::arrow::ProjectionMask;
@@ -63,6 +63,9 @@ fn build_reader(
             DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
             DataType::List(_) => build_list_reader(field, mask, false, row_groups),
             DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
+            DataType::FixedSizeList(_, _) => {
+                build_fixed_size_list_reader(field, mask, row_groups)
+            }
             d => unimplemented!("reading group type {} not implemented", d),
         },
     }
@@ -166,6 +169,43 @@ fn build_list_reader(
     Ok(reader)
 }
 
+/// Build array reader for fixed-size list type.
+fn build_fixed_size_list_reader(
+    field: &ParquetField,
+    mask: &ProjectionMask,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Option<Box<dyn ArrayReader>>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let reader = match build_reader(&children[0], mask, row_groups)? {
+        Some(item_reader) => {
+            let item_type = item_reader.get_data_type().clone();
+            let reader = match &field.arrow_type {
+                &DataType::FixedSizeList(ref f, size) => {
+                    let data_type = DataType::FixedSizeList(
+                        Arc::new(f.as_ref().clone().with_data_type(item_type)),
+                        size,
+                    );
+
+                    Box::new(FixedSizeListArrayReader::new(
+                        item_reader,
+                        size as usize,
+                        data_type,
+                        field.def_level,
+                        field.rep_level,
+                        field.nullable,
+                    )) as _
+                }
+                _ => unimplemented!(),
+            };
+            Some(reader)
+        }
+        None => None,
+    };
+    Ok(reader)
+}
+
 /// Creates primitive array reader for each primitive type.
 fn build_primitive_reader(
     field: &ParquetField,
diff --git a/parquet/src/arrow/array_reader/fixed_size_list_array.rs b/parquet/src/arrow/array_reader/fixed_size_list_array.rs
new file mode 100644
index 000000000..4cf68a066
--- /dev/null
+++ b/parquet/src/arrow/array_reader/fixed_size_list_array.rs
@@ -0,0 +1,688 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+        let mut row_len = 0;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                    row_len += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    // Length of the previous row should be equal to:
+                    // - the list's fixed size (valid entries)
+                    // - zero (null entries, start of array)
+                    // Any other length indicates invalid data
+                    if start_idx.is_some() && row_len != self.fixed_size {
+                        return Err(general_err!(
+                            "Encountered misaligned row with length {} (expected length {})",
+                            row_len,
+                            self.fixed_size
+                        ))
+                    }
+                    row_len = 0;
+
+                    if *d >= self.def_level {
+                        row_len += 1;
+
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list
+                            validity.append(*d + 1 == self.def_level);
+                        }
+                    }
+                    child_idx += 1;
+                }
+            }
+            Ok(())
+        })?;
+
+        let child_data = match start_idx {
+            Some(0) => {
+                // No null entries - can reuse original array
+                next_batch_array.to_data()
+            }
+            Some(start) => {
+                // Flush pending child items
+                child_data_builder.extend(0, start, child_idx);
+                child_data_builder.freeze()
+            }
+            None => child_data_builder.freeze(),
+        };
+
+        // Verify total number of elements is aligned with fixed list size
+        if list_len * self.fixed_size != child_data.len() {
+            return Err(general_err!(
+                "fixed-size list length must be a multiple of {} but array contains {} elements",
+                self.fixed_size,
+                child_data.len()
+            ));
+        }
+
+        let mut list_builder = ArrayData::builder(self.get_data_type().clone())
+            .len(list_len)
+            .add_child_data(child_data);
+
+        if let Some(builder) = validity {
+            list_builder = list_builder.null_bit_buffer(Some(builder.into()));
+        }
+
+        let list_data = unsafe { list_builder.build_unchecked() };
+
+        let result_array = FixedSizeListArray::from(list_data);
+        Ok(Arc::new(result_array))
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.item_reader.skip_records(num_records)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.item_reader.get_def_levels()
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.item_reader.get_rep_levels()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::{
+        array_reader::{test_util::InMemoryArrayReader, ListArrayReader},
+        arrow_reader::{
+            ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
+        },
+        ArrowWriter,
+    };
+    use arrow::datatypes::{Field, Int32Type};
+    use arrow_array::{
+        builder::{FixedSizeListBuilder, Int32Builder, ListBuilder},
+        cast::AsArray,
+        FixedSizeListArray, ListArray, PrimitiveArray, RecordBatch,
+    };
+    use arrow_buffer::Buffer;
+    use arrow_data::ArrayDataBuilder;
+    use arrow_schema::Schema;
+    use bytes::Bytes;
+
+    #[test]
+    fn test_nullable_list() {
+        // [null, [1, null, 2], null, [3, 4, 5], [null, null, null]]
+        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+            vec![
+                None,
+                Some([Some(1), None, Some(2)]),
+                None,
+                Some([Some(3), Some(4), Some(5)]),
+                Some([None, None, None]),
+            ],
+            3,
+        );
+
+        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+            None,
+            Some(1),
+            None,
+            Some(2),
+            None,
+            Some(3),
+            Some(4),
+            Some(5),
+            None,
+            None,
+            None,
+        ]));
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]),
+            Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]),
+        );
+
+        let mut list_array_reader = FixedSizeListArrayReader::new(
+            Box::new(item_array_reader),
+            3,
+            ArrowType::FixedSizeList(
+                Arc::new(Field::new("item", ArrowType::Int32, true)),
+                3,
+            ),
+            2,
+            1,
+            true,
+        );
+        let actual = list_array_reader.next_batch(1024).unwrap();
+        let actual = actual
+            .as_any()
+            .downcast_ref::<FixedSizeListArray>()
+            .unwrap();
+        assert_eq!(&expected, actual)
+    }
+
+    #[test]
+    fn test_required_list() {
+        // [[1, null], [2, 3], [null, null], [4, 5]]
+        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+            vec![
+                Some([Some(1), None]),
+                Some([Some(2), Some(3)]),
+                Some([None, None]),
+                Some([Some(4), Some(5)]),
+            ],
+            2,
+        );
+
+        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            Some(3),
+            None,
+            None,
+            Some(4),
+            Some(5),
+        ]));
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![2, 1, 2, 2, 1, 1, 2, 2]),
+            Some(vec![0, 1, 0, 1, 0, 1, 0, 1]),
+        );
+
+        let mut list_array_reader = FixedSizeListArrayReader::new(
+            Box::new(item_array_reader),
+            2,
+            ArrowType::FixedSizeList(
+                Arc::new(Field::new("item", ArrowType::Int32, true)),
+                2,
+            ),
+            1,
+            1,
+            false,
+        );
+        let actual = list_array_reader.next_batch(1024).unwrap();
+        let actual = actual
+            .as_any()
+            .downcast_ref::<FixedSizeListArray>()
+            .unwrap();
+        assert_eq!(&expected, actual)
+    }
+
+    #[test]
+    fn test_nested_list() {
+        // [
+        //   null,
+        //   [[1, 2]],
+        //   [[null, 3]],
+        //   null,
+        //   [[4, 5]],
+        //   [[null, null]],
+        // ]
+        let l2_type = ArrowType::FixedSizeList(
+            Arc::new(Field::new("item", ArrowType::Int32, true)),
+            2,
+        );
+        let l1_type = ArrowType::FixedSizeList(
+            Arc::new(Field::new("item", l2_type.clone(), false)),
+            1,
+        );
+
+        let array = PrimitiveArray::<Int32Type>::from(vec![
+            None,
+            None,
+            Some(1),
+            Some(2),
+            None,
+            Some(3),
+            None,
+            None,
+            Some(4),
+            Some(5),
+            None,
+            None,
+        ]);
+
+        let l2 = ArrayDataBuilder::new(l2_type.clone())
+            .len(6)
+            .add_child_data(array.into_data())
+            .build()
+            .unwrap();
+
+        let l1 = ArrayDataBuilder::new(l1_type.clone())
+            .len(6)
+            .add_child_data(l2)
+            .null_bit_buffer(Some(Buffer::from([0b110110])))
+            .build()
+            .unwrap();
+
+        let expected = FixedSizeListArray::from(l1);
+
+        let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+            None,
+            Some(1),
+            Some(2),
+            None,
+            Some(3),
+            None,
+            Some(4),
+            Some(5),
+            None,
+            None,
+        ]));
+
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            values,
+            Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]),
+            Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]),
+        );
+
+        let l2 = FixedSizeListArrayReader::new(
+            Box::new(item_array_reader),
+            2,
+            l2_type,
+            4,
+            2,
+            false,
+        );
+        let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true);
+
+        let expected_1 = expected.slice(0, 2);
+        let expected_2 = expected.slice(2, 4);
+
+        let actual = l1.next_batch(2).unwrap();
+        assert_eq!(actual.as_ref(), &expected_1);
+
+        let actual = l1.next_batch(1024).unwrap();
+        assert_eq!(actual.as_ref(), &expected_2);
+    }
+
+    #[test]
+    fn test_empty_list() {
+        // [null, [], null, []]
+        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+            vec![None, Some([]), None, Some([])],
+            0,
+        );
+
+        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+            None, None, None, None,
+        ]));
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![0, 1, 0, 1]),
+            Some(vec![0, 0, 0, 0]),
+        );
+
+        let mut list_array_reader = FixedSizeListArrayReader::new(
+            Box::new(item_array_reader),
+            0,
+            ArrowType::FixedSizeList(
+                Arc::new(Field::new("item", ArrowType::Int32, true)),
+                0,
+            ),
+            2,
+            1,
+            true,
+        );
+        let actual = list_array_reader.next_batch(1024).unwrap();
+        let actual = actual
+            .as_any()
+            .downcast_ref::<FixedSizeListArray>()
+            .unwrap();
+        assert_eq!(&expected, actual)
+    }
+
+    #[test]
+    fn test_nested_var_list() {
+        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
+        let mut builder =
+            FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
+        builder.values().append_value([Some(1), None, Some(3)]);
+        builder.values().append_null();
+        builder.append(true);
+        builder.values().append_value([Some(4)]);
+        builder.values().append_value([]);
+        builder.append(true);
+        builder.values().append_value([Some(5), Some(6)]);
+        builder.values().append_value([None, None]);
+        builder.append(true);
+        builder.values().append_null();
+        builder.values().append_null();
+        builder.append(false);
+        let expected = builder.finish();
+
+        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
+            Some(1),
+            None,
+            Some(3),
+            None,
+            Some(4),
+            None,
+            Some(5),
+            Some(6),
+            None,
+            None,
+            None,
+        ]));
+
+        let inner_type =
+            ArrowType::List(Arc::new(Field::new("item", ArrowType::Int32, true)));
+        let list_type = ArrowType::FixedSizeList(
+            Arc::new(Field::new("item", inner_type.clone(), true)),
+            2,
+        );
+
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
+            Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
+        );
+
+        let inner_array_reader = ListArrayReader::<i32>::new(
+            Box::new(item_array_reader),
+            inner_type,
+            4,
+            2,
+            true,
+        );
+
+        let mut list_array_reader = FixedSizeListArrayReader::new(
+            Box::new(inner_array_reader),
+            2,
+            list_type,
+            2,
+            1,
+            true,
+        );
+        let actual = list_array_reader.next_batch(1024).unwrap();
+        let actual = actual
+            .as_any()
+            .downcast_ref::<FixedSizeListArray>()
+            .unwrap();
+        assert_eq!(&expected, actual)
+    }
+
+    #[test]
+    fn test_read_list_column() {
+        // This test writes a Parquet file containing a fixed-length array column and a primitive column,
+        // then reads the columns back from the file.
+
+        // [
+        //   [1, 2, 3, null],
+        //   [5, 6, 7, 8],
+        //   null,
+        //   [9, null, 11, 12],
+        // ]
+        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+            vec![
+                Some(vec![Some(1), Some(2), Some(3), None]),
+                Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+                None,
+                Some(vec![Some(9), None, Some(11), Some(12)]),
+                Some(vec![None, None, None, None]),
+            ],
+            4,
+        );
+
+        // [null, 2, 3, null, 5]
+        let primitive = PrimitiveArray::<Int32Type>::from_iter(vec![
+            None,
+            Some(2),
+            Some(3),
+            None,
+            Some(5),
+        ]);
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "list",
+                ArrowType::FixedSizeList(
+                    Arc::new(Field::new("item", ArrowType::Int32, true)),
+                    4,
+                ),
+                true,
+            ),
+            Field::new("primitive", ArrowType::Int32, true),
+        ]));
+
+        // Create record batch with a fixed-length array column and a primitive column
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(list.clone()), Arc::new(primitive.clone())],
+        )
+        .expect("unable to create record batch");
+
+        // Write record batch to Parquet
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)
+            .expect("unable to create parquet writer");
+        writer.write(&batch).expect("unable to write record batch");
+        writer.close().expect("unable to close parquet writer");
+
+        // Read record batch from Parquet
+        let reader = Bytes::from(buffer);
+        let mut batch_reader = ParquetRecordBatchReader::try_new(reader, 1024)
+            .expect("unable to create parquet reader");
+        let actual = batch_reader
+            .next()
+            .expect("missing record batch")
+            .expect("unable to read record batch");
+
+        // Verify values of both read columns match
+        assert_eq!(schema, actual.schema());
+        let actual_list = actual
+            .column(0)
+            .as_any()
+            .downcast_ref::<FixedSizeListArray>()
+            .expect("unable to cast array to FixedSizeListArray");
+        let actual_primitive = actual.column(1).as_primitive::<Int32Type>();
+        assert_eq!(actual_list, &list);
+        assert_eq!(actual_primitive, &primitive);
+    }
+
+    #[test]
+    fn test_read_as_dyn_list() {
+        // This test verifies that fixed-size list arrays can be read from Parquet
+        // as variable-length list arrays.
+
+        // [
+        //   [1, 2, 3, null],
+        //   [5, 6, 7, 8],
+        //   null,
+        //   [9, null, 11, 12],
+        // ]
+        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+            vec![
+                Some(vec![Some(1), Some(2), Some(3), None]),
+                Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+                None,
+                Some(vec![Some(9), None, Some(11), Some(12)]),
+                Some(vec![None, None, None, None]),
+            ],
+            4,
+        );
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "list",
+            ArrowType::FixedSizeList(
+                Arc::new(Field::new("item", ArrowType::Int32, true)),
+                4,
+            ),
+            true,
+        )]));
+
+        // Create record batch with a single fixed-length array column
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list)]).unwrap();
+
+        // Write record batch to Parquet
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
+            .expect("unable to create parquet writer");
+        writer.write(&batch).expect("unable to write record batch");
+        writer.close().expect("unable to close parquet writer");
+
+        // Read record batch from Parquet - ignoring arrow metadata
+        let reader = Bytes::from(buffer);
+        let mut batch_reader = ArrowReaderBuilder::try_new_with_options(
+            reader,
+            ArrowReaderOptions::new().with_skip_arrow_metadata(true),
+        )
+        .expect("unable to create reader builder")
+        .build()
+        .expect("unable to create parquet reader");
+        let actual = batch_reader
+            .next()
+            .expect("missing record batch")
+            .expect("unable to read record batch");
+
+        // Verify the read column is a variable length list with values that match the input
+        let col = actual.column(0).as_list::<i32>();
+        let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+            Some(vec![Some(1), Some(2), Some(3), None]),
+            Some(vec![Some(5), Some(6), Some(7), Some(8)]),
+            None,
+            Some(vec![Some(9), None, Some(11), Some(12)]),
+            Some(vec![None, None, None, None]),
+        ]);
+        assert_eq!(col, &expected);
+    }
+}
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index f46f6073a..823084b43 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -35,6 +35,7 @@ mod byte_array;
 mod byte_array_dictionary;
 mod empty_array;
 mod fixed_len_byte_array;
+mod fixed_size_list_array;
 mod list_array;
 mod map_array;
 mod null_array;
@@ -48,6 +49,7 @@ pub use builder::build_array_reader;
 pub use byte_array::make_byte_array_reader;
 pub use byte_array_dictionary::make_byte_array_dictionary_reader;
 pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
+pub use fixed_size_list_array::FixedSizeListArrayReader;
 pub use list_array::ListArrayReader;
 pub use map_array::MapArrayReader;
 pub use null_array::NullArrayReader;
diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs
index fc5b94603..21b3e7dff 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -42,7 +42,7 @@
 
 use crate::errors::{ParquetError, Result};
 use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, OffsetSizeTrait, StructArray};
+use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, StructArray};
 use arrow_buffer::NullBuffer;
 use arrow_schema::{DataType, Field};
 use std::ops::Range;
@@ -144,7 +144,8 @@ impl LevelInfoBuilder {
             }
             DataType::List(child)
             | DataType::LargeList(child)
-            | DataType::Map(child, _) => {
+            | DataType::Map(child, _)
+            | DataType::FixedSizeList(child, _) => {
                 let def_level = match field.is_nullable() {
                     true => parent_ctx.def_level + 2,
                     false => parent_ctx.def_level + 1,
@@ -214,6 +215,19 @@ impl LevelInfoBuilder {
                     range,
                 )
             }
+            &DataType::FixedSizeList(_, size) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<FixedSizeListArray>()
+                    .expect("unable to get fixed-size list array");
+
+                self.write_fixed_size_list(
+                    size as usize,
+                    array.nulls(),
+                    array.values(),
+                    range,
+                )
+            }
             _ => unreachable!(),
         }
     }
@@ -371,6 +385,100 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    let row_indices = (0..fixed_size)
+                        .rev()
+                        .cycle()
+                        .take(values_end - values_start);
+
+                    // Step backward over the child rep levels and mark the start of each list
+                    rep_levels
+                        .iter_mut()
+                        .rev()
+                        // Filter out reps from nested children
+                        .filter(|&&mut r| r == ctx.rep_level)
+                        .zip(row_indices)
+                        .for_each(|(r, idx)| {
+                            if idx == 0 {
+                                *r = ctx.rep_level - 1;
+                            }
+                        });
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                if fixed_size > 0 {
+                    write_non_null(child, start_idx, end_idx)
+                } else {
+                    write_empty(child, start_idx, end_idx)
+                }
+            };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(idx);
+                    } else {
+                        // Write out any pending valid rows
+                        if let Some(start) = start_idx.take() {
+                            write_rows(child, start, idx);
+                        }
+                        // Add null row
+                        child.visit_leaves(|leaf| {
+                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                            rep_levels.push(ctx.rep_level - 1);
+                            let def_levels = leaf.def_levels.as_mut().unwrap();
+                            def_levels.push(ctx.def_level - 2);
+                        })
+                    }
+                }
+                // Write out any remaining valid rows
+                if let Some(start) = start_idx.take() {
+                    write_rows(child, start, range.end);
+                }
+            }
+            // If all rows are valid then write the whole array
+            None => write_rows(child, range.start, range.end),
+        }
+    }
+
     /// Write a primitive array, as defined by [`is_leaf`]
     fn write_leaf(&mut self, array: &dyn Array, range: Range<usize>) {
         let info = match self {
@@ -1397,4 +1505,260 @@ mod tests {
 
         assert_eq!(&levels[1], &expected_level);
     }
+
+    #[test]
+    fn test_fixed_size_list() {
+        // [[1, 2], null, null, [7, 8], null]
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+        builder.values().append_slice(&[1, 2]);
+        builder.append(true);
+        builder.values().append_slice(&[3, 4]);
+        builder.append(false);
+        builder.values().append_slice(&[5, 6]);
+        builder.append(false);
+        builder.values().append_slice(&[7, 8]);
+        builder.append(true);
+        builder.values().append_slice(&[9, 10]);
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 1..4);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1]),
+            non_null_indices: vec![6, 7],
+            max_def_level: 3,
+            max_rep_level: 1,
+        };
+        assert_eq!(list_level, &expected_level);
+    }
+
+    #[test]
+    fn test_fixed_size_list_of_struct() {
+        // define schema
+        let field_a = Field::new("a", DataType::Int32, true);
+        let field_b = Field::new("b", DataType::Int64, false);
+        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
+        let item_field = Field::new("item", DataType::Struct(fields.clone()), true);
+        let list_field = Field::new(
+            "list",
+            DataType::FixedSizeList(Arc::new(item_field), 2),
+            true,
+        );
+
+        let builder_a = Int32Builder::with_capacity(10);
+        let builder_b = Int64Builder::with_capacity(10);
+        let struct_builder =
+            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
+        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
+
+        // [
+        //   [{a: 1, b: 2}, null],
+        //   null,
+        //   [null, null],
+        //   [{a: null, b: 3}, {a: 2, b: 4}]
+        // ]
+
+        // [{a: 1, b: 2}, null]
+        let values = list_builder.values();
+        // {a: 1, b: 2}
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_value(1);
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(2);
+        values.append(true);
+        // null
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(0);
+        values.append(false);
+        list_builder.append(true);
+
+        // null
+        let values = list_builder.values();
+        // null
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(0);
+        values.append(false);
+        // null
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(0);
+        values.append(false);
+        list_builder.append(false);
+
+        // [null, null]
+        let values = list_builder.values();
+        // null
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(0);
+        values.append(false);
+        // null
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(0);
+        values.append(false);
+        list_builder.append(true);
+
+        // [{a: null, b: 3}, {a: 2, b: 4}]
+        let values = list_builder.values();
+        // {a: null, b: 3}
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_null();
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(3);
+        values.append(true);
+        // {a: 2, b: 4}
+        values
+            .field_builder::<Int32Builder>(0)
+            .unwrap()
+            .append_value(2);
+        values
+            .field_builder::<Int64Builder>(1)
+            .unwrap()
+            .append_value(4);
+        values.append(true);
+        list_builder.append(true);
+
+        let array = Arc::new(list_builder.finish());
+
+        assert_eq!(array.values().len(), 8);
+        assert_eq!(array.len(), 4);
+
+        let schema = Arc::new(Schema::new(vec![list_field]));
+        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
+        let a_levels = &levels[0];
+        let b_levels = &levels[1];
+
+        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
+        let expected_a = LevelInfo {
+            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
+            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+            non_null_indices: vec![0, 7],
+            max_def_level: 4,
+            max_rep_level: 1,
+        };
+        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
+        let expected_b = LevelInfo {
+            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
+            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+            non_null_indices: vec![0, 6, 7],
+            max_def_level: 3,
+            max_rep_level: 1,
+        };
+
+        assert_eq!(a_levels, &expected_a);
+        assert_eq!(b_levels, &expected_b);
+    }
+
+    #[test]
+    fn test_fixed_size_list_empty() {
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
+        builder.append(true);
+        builder.append(false);
+        builder.append(true);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 0..3);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![1, 0, 1]),
+            rep_levels: Some(vec![0, 0, 0]),
+            non_null_indices: vec![],
+            max_def_level: 3,
+            max_rep_level: 1,
+        };
+        assert_eq!(list_level, &expected_level);
+    }
+
+    #[test]
+    fn test_fixed_size_list_of_var_lists() {
+        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
+        let mut builder =
+            FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
+        builder.values().append_value([Some(1), None, Some(3)]);
+        builder.values().append_null();
+        builder.append(true);
+        builder.values().append_value([Some(4)]);
+        builder.values().append_value([]);
+        builder.append(true);
+        builder.values().append_value([Some(5), Some(6)]);
+        builder.values().append_value([None, None]);
+        builder.append(true);
+        builder.values().append_null();
+        builder.values().append_null();
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 0..4);
+        let levels = builder.finish();
+
+        let list_level = levels.get(0).unwrap();
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
+            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
+            non_null_indices: vec![0, 2, 3, 4, 5],
+            max_def_level: 5,
+            max_rep_level: 2,
+        };
+
+        assert_eq!(list_level, &expected_level);
+    }
 }
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index af8202182..cfad15550 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
 
 use arrow_array::cast::AsArray;
 use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type, UInt64Type};
-use arrow_array::{types, Array, ArrayRef, RecordBatch, RecordBatchWriter};
+use arrow_array::{
+    types, Array, ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchWriter,
+};
 use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef};
 
 use super::schema::{
@@ -380,7 +382,17 @@ fn write_leaves<W: Write>(
         ArrowDataType::Float16 => Err(ParquetError::ArrowError(
             "Float16 arrays not supported".to_string(),
         )),
-        ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
+        ArrowDataType::FixedSizeList(_, _) => {
+            let arrays: Vec<_> = arrays.iter().map(|array|{
+                array.as_any().downcast_ref::<FixedSizeListArray>()
+                    .expect("unable to get fixed-size list array")
+                    .values()
+                    .clone()
+            }).collect();
+            write_leaves(row_group_writer, &arrays, levels)?;
+            Ok(())
+        },
+        ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
             Err(ParquetError::NYI(
                 format!(
                     "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"