You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/30 14:45:58 UTC
[arrow-rs] branch master updated: Append Row to Rows (#4466) (#4470)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d7fa775cf Append Row to Rows (#4466) (#4470)
d7fa775cf is described below
commit d7fa775cf76c7cd54c6d2a86542115599d8f53ee
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jun 30 15:45:52 2023 +0100
Append Row to Rows (#4466) (#4470)
* Append Row to Rows (#4466)
* Tweak docs
* Pass slices to encode
* Clippy
---
arrow-row/src/dictionary.rs | 22 ++++-----
arrow-row/src/fixed.rs | 20 ++++----
arrow-row/src/lib.rs | 110 ++++++++++++++++++++++++++++++++++----------
arrow-row/src/list.rs | 10 ++--
arrow-row/src/variable.rs | 9 ++--
5 files changed, 118 insertions(+), 53 deletions(-)
diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs
index d790d951e..6c3ee9e18 100644
--- a/arrow-row/src/dictionary.rs
+++ b/arrow-row/src/dictionary.rs
@@ -58,18 +58,19 @@ pub fn compute_dictionary_mapping(
/// Encode dictionary values not preserving the dictionary encoding
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
column: &DictionaryArray<K>,
values: &Rows,
null: &Row<'_>,
) {
- for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+ for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
let row = match k {
Some(k) => values.row(k.as_usize()).data,
None => null.data,
};
let end_offset = *offset + row.len();
- out.buffer[*offset..end_offset].copy_from_slice(row);
+ data[*offset..end_offset].copy_from_slice(row);
*offset = end_offset;
}
}
@@ -79,27 +80,26 @@ pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
/// - single `0_u8` if null
/// - the bytes of the corresponding normalized key including the null terminator
pub fn encode_dictionary<K: ArrowDictionaryKeyType>(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
column: &DictionaryArray<K>,
normalized_keys: &[Option<&[u8]>],
opts: SortOptions,
) {
- for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+ for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
match k.and_then(|k| normalized_keys[k.as_usize()]) {
Some(normalized_key) => {
let end_offset = *offset + 1 + normalized_key.len();
- out.buffer[*offset] = 1;
- out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
+ data[*offset] = 1;
+ data[*offset + 1..end_offset].copy_from_slice(normalized_key);
// Negate if descending
if opts.descending {
- out.buffer[*offset..end_offset]
- .iter_mut()
- .for_each(|v| *v = !*v)
+ data[*offset..end_offset].iter_mut().for_each(|v| *v = !*v)
}
*offset = end_offset;
}
None => {
- out.buffer[*offset] = null_sentinel(opts);
+ data[*offset] = null_sentinel(opts);
*offset += 1;
}
}
diff --git a/arrow-row/src/fixed.rs b/arrow-row/src/fixed.rs
index d4b82c2a3..831105bd5 100644
--- a/arrow-row/src/fixed.rs
+++ b/arrow-row/src/fixed.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::array::PrimitiveArray;
-use crate::{null_sentinel, Rows};
+use crate::null_sentinel;
use arrow_array::builder::BufferBuilder;
use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray};
use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
@@ -177,14 +177,15 @@ where
/// - 1 byte `0` if null or `1` if valid
/// - bytes of [`FixedLengthEncoding`]
pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
i: I,
opts: SortOptions,
) {
- for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
+ for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
let end_offset = *offset + T::ENCODED_LEN;
if let Some(val) = maybe_val {
- let to_write = &mut out.buffer[*offset..end_offset];
+ let to_write = &mut data[*offset..end_offset];
to_write[0] = 1;
let mut encoded = val.encode();
if opts.descending {
@@ -193,22 +194,23 @@ pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
}
to_write[1..].copy_from_slice(encoded.as_ref())
} else {
- out.buffer[*offset] = null_sentinel(opts);
+ data[*offset] = null_sentinel(opts);
}
*offset = end_offset;
}
}
pub fn encode_fixed_size_binary(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
array: &FixedSizeBinaryArray,
opts: SortOptions,
) {
let len = array.value_length() as usize;
- for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(array.iter()) {
+ for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(array.iter()) {
let end_offset = *offset + len + 1;
if let Some(val) = maybe_val {
- let to_write = &mut out.buffer[*offset..end_offset];
+ let to_write = &mut data[*offset..end_offset];
to_write[0] = 1;
to_write[1..].copy_from_slice(&val[..len]);
if opts.descending {
@@ -216,7 +218,7 @@ pub fn encode_fixed_size_binary(
to_write[1..1 + len].iter_mut().for_each(|v| *v = !*v)
}
} else {
- out.buffer[*offset] = null_sentinel(opts);
+ data[*offset] = null_sentinel(opts);
}
*offset = end_offset;
}
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 5b9a1bb88..e8c5ff708 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -458,7 +458,7 @@ impl Codec {
let nulls = converter.convert_columns(&[null_array])?;
let owned = OwnedRow {
- data: nulls.buffer,
+ data: nulls.buffer.into(),
config: nulls.config,
};
Ok(Self::DictionaryValues(converter, owned))
@@ -496,7 +496,7 @@ impl Codec {
let nulls = converter.convert_columns(&nulls)?;
let owned = OwnedRow {
- data: nulls.buffer,
+ data: nulls.buffer.into(),
config: nulls.config,
};
@@ -715,7 +715,13 @@ impl RowConverter {
columns.iter().zip(self.fields.iter()).zip(encoders)
{
// We encode a column at a time to minimise dispatch overheads
- encode_column(&mut rows, column.as_ref(), field.options, &encoder)
+ encode_column(
+ &mut rows.buffer,
+ &mut rows.offsets,
+ column.as_ref(),
+ field.options,
+ &encoder,
+ )
}
if cfg!(debug_assertions) {
@@ -756,6 +762,48 @@ impl RowConverter {
unsafe { self.convert_raw(&mut rows, validate_utf8) }
}
+ /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
+ /// a total length of `data_capacity`
+ ///
+ /// This can be used to buffer a selection of [`Row`]
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::collections::HashSet;
+ /// # use arrow_array::cast::AsArray;
+ /// # use arrow_array::StringArray;
+ /// # use arrow_row::{Row, RowConverter, SortField};
+ /// # use arrow_schema::DataType;
+ /// #
+ /// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
+ /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
+ ///
+ /// // Convert to row format and deduplicate
+ /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
+ /// let mut distinct_rows = converter.empty_rows(3, 100);
+ /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
+ /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
+ ///
+ /// // Note: we could skip buffering and feed the filtered iterator directly
+ /// // into convert_rows, this is done for demonstration purposes only
+ /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
+ /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
+ /// assert_eq!(&values, &["hello", "world", "a"]);
+ /// ```
+ pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
+ let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
+ offsets.push(0);
+
+ Rows {
+ offsets,
+ buffer: Vec::with_capacity(data_capacity),
+ config: RowConfig {
+ fields: self.fields.clone(),
+ validate_utf8: false,
+ },
+ }
+ }
+
/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
@@ -832,14 +880,25 @@ struct RowConfig {
#[derive(Debug)]
pub struct Rows {
/// Underlying row bytes
- buffer: Box<[u8]>,
+ buffer: Vec<u8>,
/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
- offsets: Box<[usize]>,
+ offsets: Vec<usize>,
/// The config for these rows
config: RowConfig,
}
impl Rows {
+ /// Append a [`Row`] to this [`Rows`]
+ pub fn push(&mut self, row: Row<'_>) {
+ assert!(
+ Arc::ptr_eq(&row.config.fields, &self.config.fields),
+ "row was not produced by this RowConverter"
+ );
+ self.config.validate_utf8 |= row.config.validate_utf8;
+ self.buffer.extend_from_slice(row.data);
+ self.offsets.push(self.buffer.len())
+ }
+
pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
@@ -1171,15 +1230,16 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
let buffer = vec![0_u8; cur_offset];
Rows {
- buffer: buffer.into(),
- offsets: offsets.into(),
+ buffer,
+ offsets,
config,
}
}
/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
fn encode_column(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
column: &dyn Array,
opts: SortOptions,
encoder: &Encoder<'_>,
@@ -1187,22 +1247,22 @@ fn encode_column(
match encoder {
Encoder::Stateless => {
downcast_primitive_array! {
- column => fixed::encode(out, column, opts),
+ column => fixed::encode(data, offsets, column, opts),
DataType::Null => {}
- DataType::Boolean => fixed::encode(out, column.as_boolean(), opts),
+ DataType::Boolean => fixed::encode(data, offsets, column.as_boolean(), opts),
DataType::Binary => {
- variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
+ variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
}
DataType::LargeBinary => {
- variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
+ variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
}
DataType::Utf8 => variable::encode(
- out,
+ data, offsets,
column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::LargeUtf8 => variable::encode(
- out,
+ data, offsets,
column.as_string::<i64>()
.iter()
.map(|x| x.map(|x| x.as_bytes())),
@@ -1210,27 +1270,27 @@ fn encode_column(
),
DataType::FixedSizeBinary(_) => {
let array = column.as_any().downcast_ref().unwrap();
- fixed::encode_fixed_size_binary(out, array, opts)
+ fixed::encode_fixed_size_binary(data, offsets, array, opts)
}
_ => unreachable!(),
}
}
Encoder::Dictionary(dict) => {
downcast_dictionary_array! {
- column => encode_dictionary(out, column, dict, opts),
+ column => encode_dictionary(data, offsets, column, dict, opts),
_ => unreachable!()
}
}
Encoder::DictionaryValues(values, nulls) => {
downcast_dictionary_array! {
- column => encode_dictionary_values(out, column, values, nulls),
+ column => encode_dictionary_values(data, offsets, column, values, nulls),
_ => unreachable!()
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
- out.offsets
+ offsets
.iter_mut()
.skip(1)
.enumerate()
@@ -1240,15 +1300,17 @@ fn encode_column(
false => (*null, null_sentinel),
};
let end_offset = *offset + 1 + row.as_ref().len();
- out.buffer[*offset] = sentinel;
- out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref());
+ data[*offset] = sentinel;
+ data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
*offset = end_offset;
})
}
Encoder::List(rows) => match column.data_type() {
- DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
+ DataType::List(_) => {
+ list::encode(data, offsets, rows, opts, as_list_array(column))
+ }
DataType::LargeList(_) => {
- list::encode(out, rows, opts, as_large_list_array(column))
+ list::encode(data, offsets, rows, opts, as_large_list_array(column))
}
_ => unreachable!(),
},
@@ -1384,9 +1446,9 @@ mod tests {
.unwrap();
let rows = converter.convert_columns(&cols).unwrap();
- assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
+ assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
- rows.buffer.as_ref(),
+ rows.buffer,
&[
1, 128, 1, //
1, 191, 166, 102, 102, //
diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs
index e4ff878dd..73c4b6fbf 100644
--- a/arrow-row/src/list.rs
+++ b/arrow-row/src/list.rs
@@ -57,23 +57,23 @@ fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
///
/// `rows` should contain the encoded child elements
pub fn encode<O: OffsetSizeTrait>(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
rows: &Rows,
opts: SortOptions,
array: &GenericListArray<O>,
) {
let mut temporary = vec![];
- let offsets = array.value_offsets().windows(2);
- out.offsets
+ offsets
.iter_mut()
.skip(1)
- .zip(offsets)
+ .zip(array.value_offsets().windows(2))
.enumerate()
.for_each(|(idx, (offset, offsets))| {
let start = offsets[0].as_usize();
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
- let out = &mut out.buffer[*offset..];
+ let out = &mut data[*offset..];
*offset += encode_one(out, &mut temporary, rows, range, opts)
});
}
diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs
index c927f7696..e9f6160bf 100644
--- a/arrow-row/src/variable.rs
+++ b/arrow-row/src/variable.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{null_sentinel, Rows};
+use crate::null_sentinel;
use arrow_array::builder::BufferBuilder;
use arrow_array::*;
use arrow_buffer::bit_util::ceil;
@@ -62,12 +62,13 @@ pub fn padded_length(a: Option<usize>) -> usize {
/// - `0xFF_u8` if this is not the last block for this string
/// - otherwise the length of the block as a `u8`
pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
- out: &mut Rows,
+ data: &mut [u8],
+ offsets: &mut [usize],
i: I,
opts: SortOptions,
) {
- for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
- *offset += encode_one(&mut out.buffer[*offset..], maybe_val, opts);
+ for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
+ *offset += encode_one(&mut data[*offset..], maybe_val, opts);
}
}