You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "dexterduck (via GitHub)" <gi...@apache.org> on 2023/05/23 14:51:24 UTC

[GitHub] [arrow-rs] dexterduck opened a new pull request, #4267: Parquet Reader/writer for fixed-size list arrays

dexterduck opened a new pull request, #4267:
URL: https://github.com/apache/arrow-rs/pull/4267

   # Which issue does this PR close?
   
   Closes #4214 
   
   # What changes are included in this PR?
   
   - Parquet writer support for fixed-size lists
   - Parquet reader support for fixed-size lists
   
   Reader and writer both include unit tests.   
   I also sanity-checked the reader and writer against `pyarrow` (although I wasn't able to verify writing lists with nulls against pyarrow due to [this](https://github.com/apache/arrow/issues/35692) outstanding issue).
   
   # Are there any user-facing changes?
   
   None other than the added type support
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] dexterduck commented on a diff in pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "dexterduck (via GitHub)" <gi...@apache.org>.
dexterduck commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1202951391


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   It was unclear to me whether the Arrow spec allows a fixed-size list of length zero. It seemed like it's supported, since pyarrow will read and write zero length lists and a zero length fixed-size list encodes to the same def/rep levels as an empty dynamically sized list. 
   
   However, I'd be happy to change the implementation to emit an error if that behavior isn't desired!



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   Correct me if I'm wrong, but my understanding is that the child elements' write methods should handle writing the leaf repetition levels So the fixed-size list writer should only need to set the repetition level to encode the start of each row.



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one

Review Comment:
   Shouldn't the repetition level inside each row be handled by the child elements? My implementation seems to match the same logic used by `write_struct`, but maybe I'm missing something?



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -1397,4 +1504,160 @@ mod tests {
 
         assert_eq!(&levels[1], &expected_level);
     }
+
+    #[test]
+    fn test_fixed_size_list() {
+        // [[1, 2], null, null, [7, 8], null]
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+        builder.values().append_slice(&[1, 2]);
+        builder.append(true);
+        builder.values().append_slice(&[3, 4]);
+        builder.append(false);
+        builder.values().append_slice(&[5, 6]);
+        builder.append(false);
+        builder.values().append_slice(&[7, 8]);
+        builder.append(true);
+        builder.values().append_slice(&[9, 10]);
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 1..4);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1]),

Review Comment:
   I believe that the original is correct since the levels are generated by `builder.write(&a, 1..4)`.
   
   So the levels should correspond to the 1st, 2nd, and 3rd rows:
   - def `0`, rep `0` (row 1, null)
   - def `0`, rep `0` (row 2, null)
   - def `3`, rep `0` (row 3, child 0)
   - def `3`, rep `1` (row 3, child 1)
   
   Is that not right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1203947304


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+        let mut row_len = 0;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                    row_len += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    // Length of the previous row should be equal to:
+                    // - the list's fixed size (valid entries)
+                    // - zero (null entries, start of array)
+                    // Any other length indicates invalid data
+                    if row_len != 0 && row_len != self.fixed_size {
+                        return Err(general_err!(
+                            "Encountered misaligned row with length {} (expected length {})",
+                            row_len,
+                            self.fixed_size
+                        ))
+                    }
+                    row_len = 0;
+
+                    if *d >= self.def_level {
+                        row_len += 1;
+
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list
+                            validity.append(*d + 1 == self.def_level);
+                        }
+                    }
+                    child_idx += 1;
+                }
+            }
+            Ok(())
+        })?;
+
+        let child_data = match start_idx {
+            Some(0) => {
+                // No null entries - can reuse original array
+                next_batch_array.to_data()
+            }
+            Some(start) => {
+                // Flush pending child items
+                child_data_builder.extend(0, start, child_idx);
+                child_data_builder.freeze()
+            }
+            None => child_data_builder.freeze(),
+        };
+        assert_eq!(list_len * self.fixed_size, child_data.len());

Review Comment:
   I think this should return an error instead of panicking, as I think it can be encountered if the final row is the wrong length



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   Aah, I misread this the first time. I believe the logic isn't quite correct where you have a repeated child within the FixedSizeList. 
   
   In particular it is making the erroneous assumption that there will be `fixed_size` leaves.
   
   I wonder if we could make `write_list` have the signature
   
   ```
   fn write_list<I, O: OffsetSizeTrait>(
           &mut self,
           ranges: I,
           nulls: Option<&NullBuffer>,
           values: &dyn Array,
       ) where I: IntoIterator<Item=(O, O)>
   ```
   
   And use this to allow sharing the same logic :thinking: 



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   Aah this is a good point, a zero-sized fixed size list is certainly a peculiar construction, but not technically invalid I don't think



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+        let mut row_len = 0;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                    row_len += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    // Length of the previous row should be equal to:
+                    // - the list's fixed size (valid entries)
+                    // - zero (null entries, start of array)
+                    // Any other length indicates invalid data
+                    if row_len != 0 && row_len != self.fixed_size {

Review Comment:
   ```suggestion
                       if start_idx.is_some() && row_len != self.fixed_size {
   ```



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one

Review Comment:
   You are quite correct, it has been a while since I've had to touch this code :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1202763507


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;

Review Comment:
   You will need to add a license header to placate RAT



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   I think an empty list should be an error?



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row

Review Comment:
   I think we need to track the size of each list row, and ensure that each is the expected length



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one

Review Comment:
   I don't think coercing the rows like this is correct, as the repetition of each row must be different



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)

Review Comment:
   Using dyn dispatch is likely to make it harder for LLVM to lift the branch out of the loop, a simple if block will perform the same (if not better) and will have the advantage of being more likely to be optimised out



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   This doesn't seem to be correct, only the first element should be `ctx.rep_level - 1`, the rest should be `ctx.rep_level`



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -158,6 +158,18 @@ impl LevelInfoBuilder {
                 let child = Self::try_new(child.as_ref(), ctx)?;
                 Ok(Self::List(Box::new(child), ctx))
             }
+            DataType::FixedSizeList(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))

Review Comment:
   This appears to be identical to the block above? Perhaps we could combine them?



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -1397,4 +1504,160 @@ mod tests {
 
         assert_eq!(&levels[1], &expected_level);
     }
+
+    #[test]
+    fn test_fixed_size_list() {
+        // [[1, 2], null, null, [7, 8], null]
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+        builder.values().append_slice(&[1, 2]);
+        builder.append(true);
+        builder.values().append_slice(&[3, 4]);
+        builder.append(false);
+        builder.values().append_slice(&[5, 6]);
+        builder.append(false);
+        builder.values().append_slice(&[7, 8]);
+        builder.append(true);
+        builder.values().append_slice(&[9, 10]);
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 1..4);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1]),

Review Comment:
   ```suggestion
               rep_levels: Some(vec![0, 1, 0, 1]),
   ```
   As written it is not correct



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list
+                            validity.append(*d + 1 == self.def_level);
+                        }
+                    }
+                    child_idx += 1;
+                }
+            }
+            Ok(())
+        })?;
+
+        let child_data = match start_idx {
+            Some(0) => {
+                // No null entries - can reuse original array
+                next_batch_array.to_data()
+            }
+            Some(start) => {
+                // Flush pending child items
+                child_data_builder.extend(0, start, child_idx);
+                child_data_builder.freeze()
+            }
+            None => child_data_builder.freeze(),
+        };
+
+        let mut list_builder = ArrayData::builder(self.get_data_type().clone())

Review Comment:
   We could probably do with a sanity check that `list_len * size == child_data.len()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] dexterduck commented on pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "dexterduck (via GitHub)" <gi...@apache.org>.
dexterduck commented on PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#issuecomment-1561257981

   @tustvold I've finally understood the encoding logic error you were pointing out 😅. 
   I added tests for both reader and writer to verify correct encoding/decoding of fixed-length lists containing variable-length lists, and the incorrect behavior on the write side should now be fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] dexterduck commented on pull request #4267: Parquet Reader/writer for fixed-size list arrays

Posted by "dexterduck (via GitHub)" <gi...@apache.org>.
dexterduck commented on PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#issuecomment-1560068269

   @tustvold thanks for the feedback! 
   
   I implemented most of your corrections, and I added the two tests you suggested.
   I also updated the test that writes a list of structs so that the structs contain 2 fields instead of 1, to try to catch any potential errors in the def/rep level encoding.
   
   Following those changes, I believe the rep/def level writer logic is correct as-is, but it's possible I'm missing something. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org