You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "dexterduck (via GitHub)" <gi...@apache.org> on 2023/05/23 20:18:21 UTC

[GitHub] [arrow-rs] dexterduck commented on a diff in pull request #4267: Parquet Reader/writer for fixed-size list arrays

dexterduck commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1202951391


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   It was unclear to me whether the Arrow spec allows a fixed-size list of length zero. It seemed like it's supported, since pyarrow will read and write zero length lists and a zero length fixed-size list encodes to the same def/rep levels as an empty dynamically sized list. 
   
   However, I'd be happy to change the implementation to emit an error if that behavior isn't desired!



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   Correct me if I'm wrong, but my understanding is that the child elements' write methods should handle writing the leaf repetition levels So the fixed-size list writer should only need to set the repetition level to encode the start of each row.



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one

Review Comment:
   Shouldn't the repetition level inside each row be handled by the child elements? My implementation seems to match the same logic used by `write_struct`, but maybe I'm missing something?



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -1397,4 +1504,160 @@ mod tests {
 
         assert_eq!(&levels[1], &expected_level);
     }
+
+    #[test]
+    fn test_fixed_size_list() {
+        // [[1, 2], null, null, [7, 8], null]
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+        builder.values().append_slice(&[1, 2]);
+        builder.append(true);
+        builder.values().append_slice(&[3, 4]);
+        builder.append(false);
+        builder.values().append_slice(&[5, 6]);
+        builder.append(false);
+        builder.values().append_slice(&[7, 8]);
+        builder.append(true);
+        builder.values().append_slice(&[9, 10]);
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 1..4);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1]),

Review Comment:
   I believe that the original is correct since the levels are generated by `builder.write(&a, 1..4)`.
   
   So the levels should correspond to the 1st, 2nd, and 3rd rows:
   - def `0`, rep `0` (row 1, null)
   - def `0`, rep `0` (row 2, null)
   - def `3`, rep `0` (row 3, child 0)
   - def `3`, rep `1` (row 3, child 1)
   
   Is that not right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org