You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/05/23 18:08:09 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4267: Parquet Reader/writer for fixed-size list arrays

tustvold commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1202763507


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;

Review Comment:
   You will need to add a license header to placate RAT



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   I think an empty list should be an error?



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row

Review Comment:
   I think we need to track the size of each list row, and ensure that each is the expected length



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of one

Review Comment:
   I don't think coercing the rows like this is correct, as the repetition of each row must be different



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)

Review Comment:
   Using dyn dispatch is likely to make it harder for LLVM to lift the branch out of the loop, a simple if block will perform the same (if not better) and will have the advantage of being more likely to be optimised out



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   This doesn't seem to be correct, only the first element should be `ctx.rep_level - 1`, the rest should be `ctx.rep_level`



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -158,6 +158,18 @@ impl LevelInfoBuilder {
                 let child = Self::try_new(child.as_ref(), ctx)?;
                 Ok(Self::List(Box::new(child), ctx))
             }
+            DataType::FixedSizeList(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))

Review Comment:
   This appears to be identical to the block above? Perhaps we could combine them?



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -1397,4 +1504,160 @@ mod tests {
 
         assert_eq!(&levels[1], &expected_level);
     }
+
+    #[test]
+    fn test_fixed_size_list() {
+        // [[1, 2], null, null, [7, 8], null]
+        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
+        builder.values().append_slice(&[1, 2]);
+        builder.append(true);
+        builder.values().append_slice(&[3, 4]);
+        builder.append(false);
+        builder.values().append_slice(&[5, 6]);
+        builder.append(false);
+        builder.values().append_slice(&[7, 8]);
+        builder.append(true);
+        builder.values().append_slice(&[9, 10]);
+        builder.append(false);
+        let a = builder.finish();
+
+        let item_field = Field::new("item", a.data_type().clone(), true);
+        let mut builder =
+            LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap();
+        builder.write(&a, 1..4);
+        let levels = builder.finish();
+
+        assert_eq!(levels.len(), 1);
+
+        let list_level = levels.get(0).unwrap();
+
+        let expected_level = LevelInfo {
+            def_levels: Some(vec![0, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1]),

Review Comment:
   ```suggestion
               rep_levels: Some(vec![0, 1, 0, 1]),
   ```
   As written it is not correct



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list
+                            validity.append(*d + 1 == self.def_level);
+                        }
+                    }
+                    child_idx += 1;
+                }
+            }
+            Ok(())
+        })?;
+
+        let child_data = match start_idx {
+            Some(0) => {
+                // No null entries - can reuse original array
+                next_batch_array.to_data()
+            }
+            Some(start) => {
+                // Flush pending child items
+                child_data_builder.extend(0, start, child_idx);
+                child_data_builder.freeze()
+            }
+            None => child_data_builder.freeze(),
+        };
+
+        let mut list_builder = ArrayData::builder(self.get_data_type().clone())

Review Comment:
   We could probably do with a sanity check that `list_len * size == child_data.len()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org