You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/30 14:45:58 UTC

[arrow-rs] branch master updated: Append Row to Rows (#4466) (#4470)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d7fa775cf Append Row to Rows (#4466) (#4470)
d7fa775cf is described below

commit d7fa775cf76c7cd54c6d2a86542115599d8f53ee
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jun 30 15:45:52 2023 +0100

    Append Row to Rows (#4466) (#4470)
    
    * Append Row to Rows (#4466)
    
    * Tweak docs
    
    * Pass slices to encode
    
    * Clippy
---
 arrow-row/src/dictionary.rs |  22 ++++-----
 arrow-row/src/fixed.rs      |  20 ++++----
 arrow-row/src/lib.rs        | 110 ++++++++++++++++++++++++++++++++++----------
 arrow-row/src/list.rs       |  10 ++--
 arrow-row/src/variable.rs   |   9 ++--
 5 files changed, 118 insertions(+), 53 deletions(-)

diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs
index d790d951e..6c3ee9e18 100644
--- a/arrow-row/src/dictionary.rs
+++ b/arrow-row/src/dictionary.rs
@@ -58,18 +58,19 @@ pub fn compute_dictionary_mapping(
 
 /// Encode dictionary values not preserving the dictionary encoding
 pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     column: &DictionaryArray<K>,
     values: &Rows,
     null: &Row<'_>,
 ) {
-    for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
         let row = match k {
             Some(k) => values.row(k.as_usize()).data,
             None => null.data,
         };
         let end_offset = *offset + row.len();
-        out.buffer[*offset..end_offset].copy_from_slice(row);
+        data[*offset..end_offset].copy_from_slice(row);
         *offset = end_offset;
     }
 }
@@ -79,27 +80,26 @@ pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
 /// - single `0_u8` if null
 /// - the bytes of the corresponding normalized key including the null terminator
 pub fn encode_dictionary<K: ArrowDictionaryKeyType>(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     column: &DictionaryArray<K>,
     normalized_keys: &[Option<&[u8]>],
     opts: SortOptions,
 ) {
-    for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
         match k.and_then(|k| normalized_keys[k.as_usize()]) {
             Some(normalized_key) => {
                 let end_offset = *offset + 1 + normalized_key.len();
-                out.buffer[*offset] = 1;
-                out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
+                data[*offset] = 1;
+                data[*offset + 1..end_offset].copy_from_slice(normalized_key);
                 // Negate if descending
                 if opts.descending {
-                    out.buffer[*offset..end_offset]
-                        .iter_mut()
-                        .for_each(|v| *v = !*v)
+                    data[*offset..end_offset].iter_mut().for_each(|v| *v = !*v)
                 }
                 *offset = end_offset;
             }
             None => {
-                out.buffer[*offset] = null_sentinel(opts);
+                data[*offset] = null_sentinel(opts);
                 *offset += 1;
             }
         }
diff --git a/arrow-row/src/fixed.rs b/arrow-row/src/fixed.rs
index d4b82c2a3..831105bd5 100644
--- a/arrow-row/src/fixed.rs
+++ b/arrow-row/src/fixed.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::array::PrimitiveArray;
-use crate::{null_sentinel, Rows};
+use crate::null_sentinel;
 use arrow_array::builder::BufferBuilder;
 use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray};
 use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
@@ -177,14 +177,15 @@ where
 /// - 1 byte `0` if null or `1` if valid
 /// - bytes of [`FixedLengthEncoding`]
 pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     i: I,
     opts: SortOptions,
 ) {
-    for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
+    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
         let end_offset = *offset + T::ENCODED_LEN;
         if let Some(val) = maybe_val {
-            let to_write = &mut out.buffer[*offset..end_offset];
+            let to_write = &mut data[*offset..end_offset];
             to_write[0] = 1;
             let mut encoded = val.encode();
             if opts.descending {
@@ -193,22 +194,23 @@ pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
             }
             to_write[1..].copy_from_slice(encoded.as_ref())
         } else {
-            out.buffer[*offset] = null_sentinel(opts);
+            data[*offset] = null_sentinel(opts);
         }
         *offset = end_offset;
     }
 }
 
 pub fn encode_fixed_size_binary(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     array: &FixedSizeBinaryArray,
     opts: SortOptions,
 ) {
     let len = array.value_length() as usize;
-    for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(array.iter()) {
+    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(array.iter()) {
         let end_offset = *offset + len + 1;
         if let Some(val) = maybe_val {
-            let to_write = &mut out.buffer[*offset..end_offset];
+            let to_write = &mut data[*offset..end_offset];
             to_write[0] = 1;
             to_write[1..].copy_from_slice(&val[..len]);
             if opts.descending {
@@ -216,7 +218,7 @@ pub fn encode_fixed_size_binary(
                 to_write[1..1 + len].iter_mut().for_each(|v| *v = !*v)
             }
         } else {
-            out.buffer[*offset] = null_sentinel(opts);
+            data[*offset] = null_sentinel(opts);
         }
         *offset = end_offset;
     }
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 5b9a1bb88..e8c5ff708 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -458,7 +458,7 @@ impl Codec {
                     let nulls = converter.convert_columns(&[null_array])?;
 
                     let owned = OwnedRow {
-                        data: nulls.buffer,
+                        data: nulls.buffer.into(),
                         config: nulls.config,
                     };
                     Ok(Self::DictionaryValues(converter, owned))
@@ -496,7 +496,7 @@ impl Codec {
 
                 let nulls = converter.convert_columns(&nulls)?;
                 let owned = OwnedRow {
-                    data: nulls.buffer,
+                    data: nulls.buffer.into(),
                     config: nulls.config,
                 };
 
@@ -715,7 +715,13 @@ impl RowConverter {
             columns.iter().zip(self.fields.iter()).zip(encoders)
         {
             // We encode a column at a time to minimise dispatch overheads
-            encode_column(&mut rows, column.as_ref(), field.options, &encoder)
+            encode_column(
+                &mut rows.buffer,
+                &mut rows.offsets,
+                column.as_ref(),
+                field.options,
+                &encoder,
+            )
         }
 
         if cfg!(debug_assertions) {
@@ -756,6 +762,48 @@ impl RowConverter {
         unsafe { self.convert_raw(&mut rows, validate_utf8) }
     }
 
+    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
+    /// a total length of `data_capacity`
+    ///
+    /// This can be used to buffer a selection of [`Row`]
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use std::collections::HashSet;
+    /// # use arrow_array::cast::AsArray;
+    /// # use arrow_array::StringArray;
+    /// # use arrow_row::{Row, RowConverter, SortField};
+    /// # use arrow_schema::DataType;
+    /// #
+    /// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
+    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
+    ///
+    /// // Convert to row format and deduplicate
+    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
+    /// let mut distinct_rows = converter.empty_rows(3, 100);
+    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
+    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
+    ///
+    /// // Note: we could skip buffering and feed the filtered iterator directly
+    /// // into convert_rows, this is done for demonstration purposes only
+    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
+    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
+    /// assert_eq!(&values, &["hello", "world", "a"]);
+    /// ```
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
+        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
+        offsets.push(0);
+
+        Rows {
+            offsets,
+            buffer: Vec::with_capacity(data_capacity),
+            config: RowConfig {
+                fields: self.fields.clone(),
+                validate_utf8: false,
+            },
+        }
+    }
+
     /// Convert raw bytes into [`ArrayRef`]
     ///
     /// # Safety
@@ -832,14 +880,25 @@ struct RowConfig {
 #[derive(Debug)]
 pub struct Rows {
     /// Underlying row bytes
-    buffer: Box<[u8]>,
+    buffer: Vec<u8>,
     /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
-    offsets: Box<[usize]>,
+    offsets: Vec<usize>,
     /// The config for these rows
     config: RowConfig,
 }
 
 impl Rows {
+    /// Append a [`Row`] to this [`Rows`]
+    pub fn push(&mut self, row: Row<'_>) {
+        assert!(
+            Arc::ptr_eq(&row.config.fields, &self.config.fields),
+            "row was not produced by this RowConverter"
+        );
+        self.config.validate_utf8 |= row.config.validate_utf8;
+        self.buffer.extend_from_slice(row.data);
+        self.offsets.push(self.buffer.len())
+    }
+
     pub fn row(&self, row: usize) -> Row<'_> {
         let end = self.offsets[row + 1];
         let start = self.offsets[row];
@@ -1171,15 +1230,16 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
     let buffer = vec![0_u8; cur_offset];
 
     Rows {
-        buffer: buffer.into(),
-        offsets: offsets.into(),
+        buffer,
+        offsets,
         config,
     }
 }
 
 /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
 fn encode_column(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     column: &dyn Array,
     opts: SortOptions,
     encoder: &Encoder<'_>,
@@ -1187,22 +1247,22 @@ fn encode_column(
     match encoder {
         Encoder::Stateless => {
             downcast_primitive_array! {
-                column => fixed::encode(out, column, opts),
+                column => fixed::encode(data, offsets, column, opts),
                 DataType::Null => {}
-                DataType::Boolean => fixed::encode(out, column.as_boolean(), opts),
+                DataType::Boolean => fixed::encode(data, offsets, column.as_boolean(), opts),
                 DataType::Binary => {
-                    variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
+                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
                 }
                 DataType::LargeBinary => {
-                    variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
+                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
                 }
                 DataType::Utf8 => variable::encode(
-                    out,
+                    data, offsets,
                     column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
                     opts,
                 ),
                 DataType::LargeUtf8 => variable::encode(
-                    out,
+                    data, offsets,
                     column.as_string::<i64>()
                         .iter()
                         .map(|x| x.map(|x| x.as_bytes())),
@@ -1210,27 +1270,27 @@ fn encode_column(
                 ),
                 DataType::FixedSizeBinary(_) => {
                     let array = column.as_any().downcast_ref().unwrap();
-                    fixed::encode_fixed_size_binary(out, array, opts)
+                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
                 }
                 _ => unreachable!(),
             }
         }
         Encoder::Dictionary(dict) => {
             downcast_dictionary_array! {
-                column => encode_dictionary(out, column, dict, opts),
+                column => encode_dictionary(data, offsets, column, dict, opts),
                 _ => unreachable!()
             }
         }
         Encoder::DictionaryValues(values, nulls) => {
             downcast_dictionary_array! {
-                column => encode_dictionary_values(out, column, values, nulls),
+                column => encode_dictionary_values(data, offsets, column, values, nulls),
                 _ => unreachable!()
             }
         }
         Encoder::Struct(rows, null) => {
             let array = as_struct_array(column);
             let null_sentinel = null_sentinel(opts);
-            out.offsets
+            offsets
                 .iter_mut()
                 .skip(1)
                 .enumerate()
@@ -1240,15 +1300,17 @@ fn encode_column(
                         false => (*null, null_sentinel),
                     };
                     let end_offset = *offset + 1 + row.as_ref().len();
-                    out.buffer[*offset] = sentinel;
-                    out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref());
+                    data[*offset] = sentinel;
+                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
                     *offset = end_offset;
                 })
         }
         Encoder::List(rows) => match column.data_type() {
-            DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
+            DataType::List(_) => {
+                list::encode(data, offsets, rows, opts, as_list_array(column))
+            }
             DataType::LargeList(_) => {
-                list::encode(out, rows, opts, as_large_list_array(column))
+                list::encode(data, offsets, rows, opts, as_large_list_array(column))
             }
             _ => unreachable!(),
         },
@@ -1384,9 +1446,9 @@ mod tests {
         .unwrap();
         let rows = converter.convert_columns(&cols).unwrap();
 
-        assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
+        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
         assert_eq!(
-            rows.buffer.as_ref(),
+            rows.buffer,
             &[
                 1, 128, 1, //
                 1, 191, 166, 102, 102, //
diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs
index e4ff878dd..73c4b6fbf 100644
--- a/arrow-row/src/list.rs
+++ b/arrow-row/src/list.rs
@@ -57,23 +57,23 @@ fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
 ///
 /// `rows` should contain the encoded child elements
 pub fn encode<O: OffsetSizeTrait>(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     rows: &Rows,
     opts: SortOptions,
     array: &GenericListArray<O>,
 ) {
     let mut temporary = vec![];
-    let offsets = array.value_offsets().windows(2);
-    out.offsets
+    offsets
         .iter_mut()
         .skip(1)
-        .zip(offsets)
+        .zip(array.value_offsets().windows(2))
         .enumerate()
         .for_each(|(idx, (offset, offsets))| {
             let start = offsets[0].as_usize();
             let end = offsets[1].as_usize();
             let range = array.is_valid(idx).then_some(start..end);
-            let out = &mut out.buffer[*offset..];
+            let out = &mut data[*offset..];
             *offset += encode_one(out, &mut temporary, rows, range, opts)
         });
 }
diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs
index c927f7696..e9f6160bf 100644
--- a/arrow-row/src/variable.rs
+++ b/arrow-row/src/variable.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{null_sentinel, Rows};
+use crate::null_sentinel;
 use arrow_array::builder::BufferBuilder;
 use arrow_array::*;
 use arrow_buffer::bit_util::ceil;
@@ -62,12 +62,13 @@ pub fn padded_length(a: Option<usize>) -> usize {
 /// - `0xFF_u8` if this is not the last block for this string
 /// - otherwise the length of the block as a `u8`
 pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
-    out: &mut Rows,
+    data: &mut [u8],
+    offsets: &mut [usize],
     i: I,
     opts: SortOptions,
 ) {
-    for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
-        *offset += encode_one(&mut out.buffer[*offset..], maybe_val, opts);
+    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
+        *offset += encode_one(&mut data[*offset..], maybe_val, opts);
     }
 }