You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 19:31:25 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #3212: Support StructArray in Row Format (#3159)

alamb commented on code in PR #3212:
URL: https://github.com/apache/arrow-rs/pull/3212#discussion_r1033954065


##########
arrow/src/row/mod.rs:
##########
@@ -332,8 +340,99 @@ mod variable;
 #[derive(Debug)]
 pub struct RowConverter {
     fields: Arc<[SortField]>,
-    /// interning state for column `i`, if column`i` is a dictionary
-    interners: Vec<Option<Box<OrderPreservingInterner>>>,
+    /// State for codecs
+    codecs: Vec<Codec>,
+}
+
+#[derive(Debug)]
+enum Codec {

Review Comment:
   extracting this into Codec` is a nice generalization



##########
arrow/src/row/mod.rs:
##########
@@ -307,6 +308,13 @@ mod variable;
 ///
 ///      Input                  Row Format
 /// ```
+///
+/// ## Struct Encoding
+///
+/// A null is encoded as a `0_u8`.

Review Comment:
   ```suggestion
   /// A null is encoded as a `0_u8` or `0xFF_u8`
   ```



##########
arrow/src/row/mod.rs:
##########
@@ -332,8 +340,99 @@ mod variable;
 #[derive(Debug)]
 pub struct RowConverter {
     fields: Arc<[SortField]>,
-    /// interning state for column `i`, if column`i` is a dictionary
-    interners: Vec<Option<Box<OrderPreservingInterner>>>,
+    /// State for codecs
+    codecs: Vec<Codec>,
+}
+
+#[derive(Debug)]
+enum Codec {
+    /// No additional codec state is necessary
+    Stateless,
+    // The interner used to encode dictionary values
+    Dictionary(OrderPreservingInterner),
+    // A row converter for the child fields
+    // and the encoding of a row containing only nulls
+    Struct(RowConverter, OwnedRow),
+}
+
+impl Codec {
+    fn new(sort_field: &SortField) -> Result<Self> {
+        match &sort_field.data_type {
+            DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())),
+            d if !d.is_nested() => Ok(Self::Stateless),
+            DataType::Struct(f) => {
+                let sort_fields = f
+                    .iter()
+                    .map(|x| {
+                        SortField::new_with_options(
+                            x.data_type().clone(),
+                            sort_field.options,
+                        )
+                    })
+                    .collect();
+
+                let mut converter = RowConverter::new(sort_fields)?;
+                let nulls: Vec<_> =
+                    f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
+
+                let nulls = converter.convert_columns(&nulls)?;
+                let owned = OwnedRow {
+                    data: nulls.buffer,
+                    config: nulls.config,
+                };
+
+                Ok(Self::Struct(converter, owned))
+            }
+            _ => Err(ArrowError::NotYetImplemented(format!(
+                "not yet implemented: {:?}",
+                sort_field.data_type
+            ))),
+        }
+    }
+
+    fn encoder(&mut self, array: &dyn Array) -> Result<Encoder<'_>> {
+        match self {
+            Codec::Stateless => Ok(Encoder::Stateless),
+            Codec::Dictionary(interner) => {
+                let values = downcast_dictionary_array! {
+                    array => array.values(),
+                    _ => unreachable!()
+                };
+
+                let mapping = compute_dictionary_mapping(interner, values)
+                    .into_iter()
+                    .map(|maybe_interned| {
+                        maybe_interned.map(|interned| interner.normalized_key(interned))
+                    })
+                    .collect();
+
+                Ok(Encoder::Dictionary(mapping))
+            }
+            Codec::Struct(converter, null) => {
+                let v = as_struct_array(array);
+                let rows = converter.convert_columns(v.columns())?;

Review Comment:
   Is it true that this code converts the struct array to row format now and then the individual rows are copied over one by one into the resulting format?
   
   In other words, the row format form of the struct array is copied?



##########
arrow/src/row/mod.rs:
##########
@@ -783,54 +881,64 @@ fn null_sentinel(options: SortOptions) -> u8 {
 }
 
 /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
-fn new_empty_rows(
-    cols: &[ArrayRef],
-    dictionaries: &[Option<Vec<Option<&[u8]>>>],
-    config: RowConfig,
-) -> Rows {
+fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) -> Rows {
     use fixed::FixedLengthEncoding;
 
     let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
     let mut lengths = vec![0; num_rows];
 
-    for (array, dict) in cols.iter().zip(dictionaries) {
-        downcast_primitive_array! {
-            array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
-            DataType::Null => {},
-            DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
-            DataType::Binary => as_generic_binary_array::<i32>(array)
-                .iter()
-                .zip(lengths.iter_mut())
-                .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
-            DataType::LargeBinary => as_generic_binary_array::<i64>(array)
-                .iter()
-                .zip(lengths.iter_mut())
-                .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
-            DataType::Utf8 => as_string_array(array)
-                .iter()
-                .zip(lengths.iter_mut())
-                .for_each(|(slice, length)| {
-                    *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
-                }),
-            DataType::LargeUtf8 => as_largestring_array(array)
-                .iter()
-                .zip(lengths.iter_mut())
-                .for_each(|(slice, length)| {
-                    *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
-                }),
-            DataType::Dictionary(_, _) => downcast_dictionary_array! {
-                array => {
-                    let dict = dict.as_ref().unwrap();
-                    for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
-                        match v.and_then(|v| dict[v as usize]) {
-                            Some(k) => *length += k.len() + 1,
-                            None => *length += 1,
+    for (array, encoder) in cols.iter().zip(encoders) {
+        match encoder {
+            Encoder::Stateless => {
+                downcast_primitive_array! {
+                    array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
+                    DataType::Null => {},
+                    DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
+                    DataType::Binary => as_generic_binary_array::<i32>(array)
+                        .iter()
+                        .zip(lengths.iter_mut())
+                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
+                    DataType::LargeBinary => as_generic_binary_array::<i64>(array)
+                        .iter()
+                        .zip(lengths.iter_mut())
+                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
+                    DataType::Utf8 => as_string_array(array)

Review Comment:
   Seeing this list of types I think it is going to conflict with https://github.com/apache/arrow-rs/pull/3182



##########
arrow/src/row/mod.rs:
##########
@@ -332,8 +340,99 @@ mod variable;
 #[derive(Debug)]
 pub struct RowConverter {
     fields: Arc<[SortField]>,
-    /// interning state for column `i`, if column`i` is a dictionary
-    interners: Vec<Option<Box<OrderPreservingInterner>>>,
+    /// State for codecs
+    codecs: Vec<Codec>,
+}
+
+#[derive(Debug)]
+enum Codec {
+    /// No additional codec state is necessary
+    Stateless,
+    // The interner used to encode dictionary values
+    Dictionary(OrderPreservingInterner),
+    // A row converter for the child fields
+    // and the encoding of a row containing only nulls
+    Struct(RowConverter, OwnedRow),
+}
+
+impl Codec {
+    fn new(sort_field: &SortField) -> Result<Self> {
+        match &sort_field.data_type {
+            DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())),
+            d if !d.is_nested() => Ok(Self::Stateless),
+            DataType::Struct(f) => {
+                let sort_fields = f
+                    .iter()
+                    .map(|x| {
+                        SortField::new_with_options(
+                            x.data_type().clone(),
+                            sort_field.options,
+                        )
+                    })
+                    .collect();
+
+                let mut converter = RowConverter::new(sort_fields)?;
+                let nulls: Vec<_> =
+                    f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
+
+                let nulls = converter.convert_columns(&nulls)?;
+                let owned = OwnedRow {
+                    data: nulls.buffer,
+                    config: nulls.config,
+                };
+
+                Ok(Self::Struct(converter, owned))
+            }
+            _ => Err(ArrowError::NotYetImplemented(format!(
+                "not yet implemented: {:?}",

Review Comment:
   ```suggestion
                   "Row format support not yet implemented for: {:?}",
   ```



##########
arrow/src/row/mod.rs:
##########
@@ -307,6 +308,13 @@ mod variable;
 ///
 ///      Input                  Row Format
 /// ```
+///
+/// ## Struct Encoding

Review Comment:
   👍 



##########
arrow/src/row/mod.rs:
##########
@@ -1329,6 +1502,54 @@ mod tests {
         assert_eq!(&cols[0], &a);
     }
 
+    #[test]
+    fn test_struct() {

Review Comment:
   Would it be valuable to more than one level (aka a struct array with a struct array child)? Also, how about a struct array with another type?



##########
arrow/src/row/mod.rs:
##########
@@ -872,35 +980,59 @@ fn encode_column(
     out: &mut Rows,
     column: &ArrayRef,
     opts: SortOptions,
-    dictionary: Option<&[Option<&[u8]>]>,
+    encoder: &Encoder<'_>,
 ) {
-    downcast_primitive_array! {
-        column => fixed::encode(out, column, opts),
-        DataType::Null => {}
-        DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts),
-        DataType::Binary => {
-            variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
+    match encoder {
+        Encoder::Stateless => {
+            downcast_primitive_array! {
+                column => fixed::encode(out, column, opts),
+                DataType::Null => {}
+                DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts),
+                DataType::Binary => {
+                    variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
+                }
+                DataType::LargeBinary => {
+                    variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
+                }
+                DataType::Utf8 => variable::encode(
+                    out,
+                    as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())),
+                    opts,
+                ),
+                DataType::LargeUtf8 => variable::encode(
+                    out,
+                    as_largestring_array(column)
+                        .iter()
+                        .map(|x| x.map(|x| x.as_bytes())),
+                    opts,
+                ),
+                _ => unreachable!(),
+            }
         }
-        DataType::LargeBinary => {
-            variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
+        Encoder::Dictionary(dict) => {
+            downcast_dictionary_array! {
+                column => encode_dictionary(out, column, dict, opts),
+                _ => unreachable!()
+            }
         }
-        DataType::Utf8 => variable::encode(
-            out,
-            as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())),
-            opts,
-        ),
-        DataType::LargeUtf8 => variable::encode(
-            out,
-            as_largestring_array(column)
-                .iter()
-                .map(|x| x.map(|x| x.as_bytes())),
-            opts,
-        ),
-        DataType::Dictionary(_, _) => downcast_dictionary_array! {
-            column => encode_dictionary(out, column, dictionary.unwrap(), opts),
-            _ => unreachable!()
+        Encoder::Struct(rows, null) => {
+            let array = as_struct_array(column.as_ref());
+            let null_sentinel = null_sentinel(opts);
+            out.offsets
+                .iter_mut()
+                .skip(1)
+                .enumerate()
+                .for_each(|(idx, offset)| {
+                    let (row, sentinel) = match array.is_valid(idx) {
+                        true => (rows.row(idx), 0x01),
+                        false => (*null, null_sentinel),
+                    };
+                    let end_offset = *offset + 1 + row.as_ref().len();
+                    out.buffer[*offset] = sentinel;
+                    out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref());

Review Comment:
   Since the row format of Struct arrays can be variable length, how are we ensuring that the contents of one struct array aren't accidentally compared with the contents of another field (e.g. the COBs encoding approach for variable length strings). 
   
   Also I wonder why not encode the struct array directly into `out` rather than copy it from another `Rows`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org