You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/09/05 15:02:52 UTC
[arrow-rs] branch master updated: Re-encode dictionaries in selection kernels (#3558)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 65edbb1702 Re-encode dictionaries in selection kernels (#3558)
65edbb1702 is described below
commit 65edbb1702e25420aacebe656dcd789690f72c82
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Sep 5 16:02:46 2023 +0100
Re-encode dictionaries in selection kernels (#3558)
* Re-encode dictionaries in selection kernels
* More benchmarks
* Best-effort hashing
* More benchmarks
* Add fallback to concatenating dictionaries
* Fix nulls
* Format
* Cleanup
* RAT
* Clippy
* Split out heuristic
* Add support to interleave kernel
* Clippy
* More clippy
* Clippy
* Cleanup
* Optimize concat
* Review feedback
* Clippy
* Improved null handling
* Further tests
* Faster ptr_eq
---
arrow-buffer/src/buffer/immutable.rs | 8 +
arrow-buffer/src/buffer/offset.rs | 8 +
arrow-buffer/src/buffer/scalar.rs | 8 +
arrow-select/Cargo.toml | 1 +
arrow-select/src/concat.rs | 186 ++++++++++++++-----
arrow-select/src/dictionary.rs | 333 +++++++++++++++++++++++++++++++++++
arrow-select/src/interleave.rs | 165 +++++++++++++----
arrow-select/src/lib.rs | 1 +
arrow/benches/concatenate_kernel.rs | 22 +++
arrow/benches/interleave_kernels.rs | 32 +++-
arrow/src/util/bench_util.rs | 25 ++-
11 files changed, 698 insertions(+), 91 deletions(-)
diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index 8296d3fbcc..bda6dfc5cd 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -323,6 +323,14 @@ impl Buffer {
length,
})
}
+
+ /// Returns true if this [`Buffer`] is equal to `other`, using pointer comparisons
+ /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+ /// return false when the arrays are logically equal
+ #[inline]
+ pub fn ptr_eq(&self, other: &Self) -> bool {
+ self.ptr == other.ptr && self.length == other.length
+ }
}
/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs
index fede32c579..a6f2f7f6cf 100644
--- a/arrow-buffer/src/buffer/offset.rs
+++ b/arrow-buffer/src/buffer/offset.rs
@@ -148,6 +148,14 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
pub fn slice(&self, offset: usize, len: usize) -> Self {
Self(self.0.slice(offset, len.saturating_add(1)))
}
+
+ /// Returns true if this [`OffsetBuffer`] is equal to `other`, using pointer comparisons
+ /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+ /// return false when the arrays are logically equal
+ #[inline]
+ pub fn ptr_eq(&self, other: &Self) -> bool {
+ self.0.ptr_eq(&other.0)
+ }
}
impl<T: ArrowNativeType> Deref for OffsetBuffer<T> {
diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs
index 70c86f1186..276e635e82 100644
--- a/arrow-buffer/src/buffer/scalar.rs
+++ b/arrow-buffer/src/buffer/scalar.rs
@@ -86,6 +86,14 @@ impl<T: ArrowNativeType> ScalarBuffer<T> {
pub fn into_inner(self) -> Buffer {
self.buffer
}
+
+ /// Returns true if this [`ScalarBuffer`] is equal to `other`, using pointer comparisons
+ /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+ /// return false when the arrays are logically equal
+ #[inline]
+ pub fn ptr_eq(&self, other: &Self) -> bool {
+ self.buffer.ptr_eq(&other.buffer)
+ }
}
impl<T: ArrowNativeType> Deref for ScalarBuffer<T> {
diff --git a/arrow-select/Cargo.toml b/arrow-select/Cargo.toml
index ff8a212c7b..023788799c 100644
--- a/arrow-select/Cargo.toml
+++ b/arrow-select/Cargo.toml
@@ -39,6 +39,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-array = { workspace = true }
num = { version = "0.4", default-features = false, features = ["std"] }
+ahash = { version = "0.8", default-features = false}
[features]
default = []
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index eed20699c2..c34c3d3d0c 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -30,20 +30,20 @@
//! assert_eq!(arr.len(), 3);
//! ```
+use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
+use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
-use arrow_buffer::ArrowNativeType;
+use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer};
use arrow_data::transform::{Capacities, MutableArrayData};
use arrow_schema::{ArrowError, DataType, SchemaRef};
+use std::sync::Arc;
fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
let mut item_capacity = 0;
let mut bytes_capacity = 0;
for array in arrays {
- let a = array
- .as_any()
- .downcast_ref::<GenericByteArray<T>>()
- .unwrap();
+ let a = array.as_bytes::<T>();
// Guaranteed to always have at least one element
let offsets = a.value_offsets();
@@ -54,6 +54,59 @@ fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
Capacities::Binary(item_capacity, Some(bytes_capacity))
}
+fn concat_dictionaries<K: ArrowDictionaryKeyType>(
+ arrays: &[&dyn Array],
+) -> Result<ArrayRef, ArrowError> {
+ let mut output_len = 0;
+ let dictionaries: Vec<_> = arrays
+ .iter()
+ .map(|x| x.as_dictionary::<K>())
+ .inspect(|d| output_len += d.len())
+ .collect();
+
+ if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
+ return concat_fallback(arrays, Capacities::Array(output_len));
+ }
+
+ let merged = merge_dictionary_values(&dictionaries, None)?;
+
+ // Recompute keys
+ let mut key_values = Vec::with_capacity(output_len);
+
+ let mut has_nulls = false;
+ for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
+ has_nulls |= d.null_count() != 0;
+ for key in d.keys().values() {
+ // Use get to safely handle nulls
+ key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
+ }
+ }
+
+ let nulls = has_nulls.then(|| {
+ let mut nulls = BooleanBufferBuilder::new(output_len);
+ for d in &dictionaries {
+ match d.nulls() {
+ Some(n) => nulls.append_buffer(n.inner()),
+ None => nulls.append_n(d.len(), true),
+ }
+ }
+ NullBuffer::new(nulls.finish())
+ });
+
+ let keys = PrimitiveArray::<K>::new(key_values.into(), nulls);
+ // Sanity check
+ assert_eq!(keys.len(), output_len);
+
+ let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
+ Ok(Arc::new(array))
+}
+
+macro_rules! dict_helper {
+ ($t:ty, $arrays:expr) => {
+ return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
+ };
+}
+
/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
if arrays.is_empty() {
@@ -78,9 +131,23 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
+ DataType::Dictionary(k, _) => downcast_integer! {
+ k.as_ref() => (dict_helper, arrays),
+ _ => unreachable!("illegal dictionary key type {k}")
+ },
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
};
+ concat_fallback(arrays, capacity)
+}
+
+/// Concatenates arrays using MutableArrayData
+///
+/// This will naively concatenate dictionaries
+fn concat_fallback(
+ arrays: &[&dyn Array],
+ capacity: Capacities,
+) -> Result<ArrayRef, ArrowError> {
let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
let array_data = array_data.iter().collect();
let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
@@ -140,6 +207,7 @@ pub fn concat_batches<'a>(
#[cfg(test)]
mod tests {
use super::*;
+ use arrow_array::builder::StringDictionaryBuilder;
use arrow_array::cast::AsArray;
use arrow_schema::{Field, Schema};
use std::sync::Arc;
@@ -468,29 +536,10 @@ mod tests {
}
fn collect_string_dictionary(
- dictionary: &DictionaryArray<Int32Type>,
- ) -> Vec<Option<String>> {
- let values = dictionary.values();
- let values = values.as_any().downcast_ref::<StringArray>().unwrap();
-
- dictionary
- .keys()
- .iter()
- .map(|key| key.map(|key| values.value(key as _).to_string()))
- .collect()
- }
-
- fn concat_dictionary(
- input_1: DictionaryArray<Int32Type>,
- input_2: DictionaryArray<Int32Type>,
- ) -> Vec<Option<String>> {
- let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
- let concat = concat
- .as_any()
- .downcast_ref::<DictionaryArray<Int32Type>>()
- .unwrap();
-
- collect_string_dictionary(concat)
+ array: &DictionaryArray<Int32Type>,
+ ) -> Vec<Option<&str>> {
+ let concrete = array.downcast_dict::<StringArray>().unwrap();
+ concrete.into_iter().collect()
}
#[test]
@@ -509,11 +558,19 @@ mod tests {
"E",
]
.into_iter()
- .map(|x| Some(x.to_string()))
+ .map(Some)
.collect();
- let concat = concat_dictionary(input_1, input_2);
- assert_eq!(concat, expected);
+ let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+ let dictionary = concat.as_dictionary::<Int32Type>();
+ let actual = collect_string_dictionary(dictionary);
+ assert_eq!(actual, expected);
+
+ // Should have concatenated inputs together
+ assert_eq!(
+ dictionary.values().len(),
+ input_1.values().len() + input_2.values().len(),
+ )
}
#[test]
@@ -523,16 +580,45 @@ mod tests {
.into_iter()
.collect();
let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
- let expected = vec![
- Some("foo".to_string()),
- Some("bar".to_string()),
- None,
- Some("fiz".to_string()),
- None,
- ];
+ let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
- let concat = concat_dictionary(input_1, input_2);
- assert_eq!(concat, expected);
+ let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+ let dictionary = concat.as_dictionary::<Int32Type>();
+ let actual = collect_string_dictionary(dictionary);
+ assert_eq!(actual, expected);
+
+ // Should have concatenated inputs together
+ assert_eq!(
+ dictionary.values().len(),
+ input_1.values().len() + input_2.values().len(),
+ )
+ }
+
+ #[test]
+ fn test_string_dictionary_merge() {
+ let mut builder = StringDictionaryBuilder::<Int32Type>::new();
+ for i in 0..20 {
+ builder.append(&i.to_string()).unwrap();
+ }
+ let input_1 = builder.finish();
+
+ let mut builder = StringDictionaryBuilder::<Int32Type>::new();
+ for i in 0..30 {
+ builder.append(&i.to_string()).unwrap();
+ }
+ let input_2 = builder.finish();
+
+ let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
+ let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
+
+ let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+ let dictionary = concat.as_dictionary::<Int32Type>();
+ let actual = collect_string_dictionary(dictionary);
+ assert_eq!(actual, expected);
+
+ // Should have merged inputs together
+ // Not 30 as this is done on a best-effort basis
+ assert_eq!(dictionary.values().len(), 33)
}
#[test]
@@ -556,7 +642,7 @@ mod tests {
fn test_dictionary_concat_reuse() {
let array: DictionaryArray<Int8Type> =
vec!["a", "a", "b", "c"].into_iter().collect();
- let copy: DictionaryArray<Int8Type> = array.to_data().into();
+ let copy: DictionaryArray<Int8Type> = array.clone();
// dictionary is "a", "b", "c"
assert_eq!(
@@ -567,11 +653,7 @@ mod tests {
// concatenate it with itself
let combined = concat(&[© as _, &array as _]).unwrap();
-
- let combined = combined
- .as_any()
- .downcast_ref::<DictionaryArray<Int8Type>>()
- .unwrap();
+ let combined = combined.as_dictionary::<Int8Type>();
assert_eq!(
combined.values(),
@@ -738,4 +820,16 @@ mod tests {
assert_eq!(data.buffers()[1].len(), 200);
assert_eq!(data.buffers()[1].capacity(), 256); // Nearest multiple of 64
}
+
+ #[test]
+ fn concat_sparse_nulls() {
+ let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
+ let keys = Int32Array::from(vec![1; 10]);
+ let dict_a = DictionaryArray::new(keys, Arc::new(values));
+ let values = StringArray::new_null(0);
+ let keys = Int32Array::new_null(10);
+ let dict_b = DictionaryArray::new(keys, Arc::new(values));
+ let array = concat(&[&dict_a, &dict_b]).unwrap();
+ assert_eq!(array.null_count(), 10);
+ }
}
diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs
new file mode 100644
index 0000000000..8630b332f0
--- /dev/null
+++ b/arrow-select/src/dictionary.rs
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::interleave::interleave;
+use ahash::RandomState;
+use arrow_array::builder::BooleanBufferBuilder;
+use arrow_array::cast::AsArray;
+use arrow_array::types::{
+ ArrowDictionaryKeyType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type,
+ Utf8Type,
+};
+use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray};
+use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer};
+use arrow_schema::{ArrowError, DataType};
+
+/// A best effort interner that maintains a fixed number of buckets
+/// and interns keys based on their hash value
+///
+/// Hash collisions will result in replacement
+struct Interner<'a, V> {
+ state: RandomState,
+ buckets: Vec<Option<(&'a [u8], V)>>,
+ shift: u32,
+}
+
+impl<'a, V> Interner<'a, V> {
+ /// Capacity controls the number of unique buckets allocated within the Interner
+ ///
+ /// A larger capacity reduces the probability of hash collisions, and should be set
+ /// based on an approximation of the upper bound of unique values
+ fn new(capacity: usize) -> Self {
+ // Add additional buckets to help reduce collisions
+ let shift = (capacity as u64 + 128).leading_zeros();
+ let num_buckets = (u64::MAX >> shift) as usize;
+ let buckets = (0..num_buckets.saturating_add(1)).map(|_| None).collect();
+ Self {
+ // A fixed seed to ensure deterministic behaviour
+ state: RandomState::with_seeds(0, 0, 0, 0),
+ buckets,
+ shift,
+ }
+ }
+
+ fn intern<F: FnOnce() -> Result<V, E>, E>(
+ &mut self,
+ new: &'a [u8],
+ f: F,
+ ) -> Result<&V, E> {
+ let hash = self.state.hash_one(new);
+ let bucket_idx = hash >> self.shift;
+ Ok(match &mut self.buckets[bucket_idx as usize] {
+ Some((current, v)) => {
+ if *current != new {
+ *v = f()?;
+ *current = new;
+ }
+ v
+ }
+ slot => &slot.insert((new, f()?)).1,
+ })
+ }
+}
+
+pub struct MergedDictionaries<K: ArrowDictionaryKeyType> {
+ /// Provides `key_mappings[`array_idx`][`old_key`] -> new_key`
+ pub key_mappings: Vec<Vec<K::Native>>,
+ /// The new values
+ pub values: ArrayRef,
+}
+
+/// Performs a cheap, pointer-based comparison of two byte array
+///
+/// See [`ScalarBuffer::ptr_eq`]
+fn bytes_ptr_eq<T: ByteArrayType>(a: &dyn Array, b: &dyn Array) -> bool {
+ match (a.as_bytes_opt::<T>(), b.as_bytes_opt::<T>()) {
+ (Some(a), Some(b)) => {
+ let values_eq =
+ a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets());
+ match (a.nulls(), b.nulls()) {
+ (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()),
+ (None, None) => values_eq,
+ _ => false,
+ }
+ }
+ _ => false,
+ }
+}
+
+/// A type-erased function that compares two array for pointer equality
+type PtrEq = dyn Fn(&dyn Array, &dyn Array) -> bool;
+
+/// A weak heuristic of whether to merge dictionary values that aims to only
+/// perform the expensive merge computation when it is likely to yield at least
+/// some return over the naive approach used by MutableArrayData
+///
+/// `len` is the total length of the merged output
+pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
+ dictionaries: &[&DictionaryArray<K>],
+ len: usize,
+) -> bool {
+ use DataType::*;
+ let first_values = dictionaries[0].values().as_ref();
+ let ptr_eq: Box<PtrEq> = match first_values.data_type() {
+ Utf8 => Box::new(bytes_ptr_eq::<Utf8Type>),
+ LargeUtf8 => Box::new(bytes_ptr_eq::<LargeUtf8Type>),
+ Binary => Box::new(bytes_ptr_eq::<BinaryType>),
+ LargeBinary => Box::new(bytes_ptr_eq::<LargeBinaryType>),
+ _ => return false,
+ };
+
+ let mut single_dictionary = true;
+ let mut total_values = first_values.len();
+ for dict in dictionaries.iter().skip(1) {
+ let values = dict.values().as_ref();
+ total_values += values.len();
+ if single_dictionary {
+ single_dictionary = ptr_eq(first_values, values)
+ }
+ }
+
+ let overflow = K::Native::from_usize(total_values).is_none();
+ let values_exceed_length = total_values >= len;
+
+ !single_dictionary && (overflow || values_exceed_length)
+}
+
+/// Given an array of dictionaries and an optional key mask compute a values array
+/// containing referenced values, along with mappings from the [`DictionaryArray`]
+/// keys to the new keys within this values array. Best-effort will be made to ensure
+/// that the dictionary values are unique
+///
+/// This method is meant to be very fast and the output dictionary values
+/// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower
+/// but produces unique values
+pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
+ dictionaries: &[&DictionaryArray<K>],
+ masks: Option<&[BooleanBuffer]>,
+) -> Result<MergedDictionaries<K>, ArrowError> {
+ let mut num_values = 0;
+
+ let mut values = Vec::with_capacity(dictionaries.len());
+ let mut value_slices = Vec::with_capacity(dictionaries.len());
+
+ for (idx, dictionary) in dictionaries.iter().enumerate() {
+ let mask = masks.and_then(|m| m.get(idx));
+ let key_mask = match (dictionary.logical_nulls(), mask) {
+ (Some(n), None) => Some(n.into_inner()),
+ (None, Some(n)) => Some(n.clone()),
+ (Some(n), Some(m)) => Some(n.inner() & m),
+ (None, None) => None,
+ };
+ let keys = dictionary.keys().values();
+ let values_mask = compute_values_mask(keys, key_mask.as_ref());
+ let v = dictionary.values().as_ref();
+ num_values += v.len();
+ value_slices.push(get_masked_values(v, &values_mask));
+ values.push(v)
+ }
+
+ // Map from value to new index
+ let mut interner = Interner::new(num_values);
+ // Interleave indices for new values array
+ let mut indices = Vec::with_capacity(num_values);
+
+ // Compute the mapping for each dictionary
+ let key_mappings = dictionaries
+ .iter()
+ .enumerate()
+ .zip(value_slices)
+ .map(|((dictionary_idx, dictionary), values)| {
+ let zero = K::Native::from_usize(0).unwrap();
+ let mut mapping = vec![zero; dictionary.values().len()];
+
+ for (value_idx, value) in values {
+ mapping[value_idx] = *interner.intern(value, || {
+ match K::Native::from_usize(indices.len()) {
+ Some(idx) => {
+ indices.push((dictionary_idx, value_idx));
+ Ok(idx)
+ }
+ None => Err(ArrowError::DictionaryKeyOverflowError),
+ }
+ })?;
+ }
+ Ok(mapping)
+ })
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ Ok(MergedDictionaries {
+ key_mappings,
+ values: interleave(&values, &indices)?,
+ })
+}
+
+/// Return a mask identifying the values that are referenced by keys in `dictionary`
+/// at the positions indicated by `selection`
+fn compute_values_mask<K: ArrowNativeType>(
+ keys: &ScalarBuffer<K>,
+ mask: Option<&BooleanBuffer>,
+) -> BooleanBuffer {
+ let mut builder = BooleanBufferBuilder::new(keys.len());
+ builder.advance(keys.len());
+
+ match mask {
+ Some(n) => n
+ .set_indices()
+ .for_each(|idx| builder.set_bit(keys[idx].as_usize(), true)),
+ None => keys
+ .iter()
+ .for_each(|k| builder.set_bit(k.as_usize(), true)),
+ }
+ builder.finish()
+}
+
+/// Return a Vec containing for each set index in `mask`, the index and byte value of that index
+fn get_masked_values<'a>(
+ array: &'a dyn Array,
+ mask: &BooleanBuffer,
+) -> Vec<(usize, &'a [u8])> {
+ match array.data_type() {
+ DataType::Utf8 => masked_bytes(array.as_string::<i32>(), mask),
+ DataType::LargeUtf8 => masked_bytes(array.as_string::<i64>(), mask),
+ DataType::Binary => masked_bytes(array.as_binary::<i32>(), mask),
+ DataType::LargeBinary => masked_bytes(array.as_binary::<i64>(), mask),
+ _ => unimplemented!(),
+ }
+}
+
+/// Compute [`get_masked_values`] for a [`GenericByteArray`]
+///
+/// Note: this does not check the null mask and will return values contained in null slots
+fn masked_bytes<'a, T: ByteArrayType>(
+ array: &'a GenericByteArray<T>,
+ mask: &BooleanBuffer,
+) -> Vec<(usize, &'a [u8])> {
+ let mut out = Vec::with_capacity(mask.count_set_bits());
+ for idx in mask.set_indices() {
+ out.push((idx, array.value(idx).as_ref()))
+ }
+ out
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::dictionary::merge_dictionary_values;
+ use arrow_array::cast::as_string_array;
+ use arrow_array::types::Int32Type;
+ use arrow_array::{DictionaryArray, Int32Array, StringArray};
+ use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_merge_strings() {
+ let a =
+ DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]);
+ let b = DictionaryArray::<Int32Type>::from_iter(["c", "f", "c", "d", "a", "d"]);
+ let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
+
+ let values = as_string_array(merged.values.as_ref());
+ let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+ assert_eq!(&actual, &["a", "b", "d", "c", "e", "f"]);
+
+ assert_eq!(merged.key_mappings.len(), 2);
+ assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 3, 4]);
+ assert_eq!(&merged.key_mappings[1], &[3, 5, 2, 0]);
+
+ let a_slice = a.slice(1, 4);
+ let merged = merge_dictionary_values(&[&a_slice, &b], None).unwrap();
+
+ let values = as_string_array(merged.values.as_ref());
+ let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+ assert_eq!(&actual, &["a", "b", "d", "c", "f"]);
+
+ assert_eq!(merged.key_mappings.len(), 2);
+ assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 0, 0]);
+ assert_eq!(&merged.key_mappings[1], &[3, 4, 2, 0]);
+
+ // Mask out only ["b", "b", "d"] from a
+ let a_mask =
+ BooleanBuffer::from_iter([false, true, false, true, true, false, false]);
+ let b_mask = BooleanBuffer::new_set(b.len());
+ let merged = merge_dictionary_values(&[&a, &b], Some(&[a_mask, b_mask])).unwrap();
+
+ let values = as_string_array(merged.values.as_ref());
+ let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+ assert_eq!(&actual, &["b", "d", "c", "f", "a"]);
+
+ assert_eq!(merged.key_mappings.len(), 2);
+ assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 0, 0]);
+ assert_eq!(&merged.key_mappings[1], &[2, 3, 1, 4]);
+ }
+
+ #[test]
+ fn test_merge_nulls() {
+ let buffer = Buffer::from("helloworldbingohelloworld");
+ let offsets = OffsetBuffer::from_lengths([5, 5, 5, 5, 5]);
+ let nulls = NullBuffer::from(vec![true, false, true, true, true]);
+ let values = StringArray::new(offsets, buffer, Some(nulls));
+
+ let key_values = vec![1, 2, 3, 1, 8, 2, 3];
+ let key_nulls =
+ NullBuffer::from(vec![true, true, false, true, false, true, true]);
+ let keys = Int32Array::new(key_values.into(), Some(key_nulls));
+ let a = DictionaryArray::new(keys, Arc::new(values));
+ // [NULL, "bingo", NULL, NULL, NULL, "bingo", "hello"]
+
+ let b = DictionaryArray::new(
+ Int32Array::new_null(10),
+ Arc::new(StringArray::new_null(0)),
+ );
+
+ let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
+ let expected = StringArray::from(vec!["bingo", "hello"]);
+ assert_eq!(merged.values.as_ref(), &expected);
+ assert_eq!(merged.key_mappings.len(), 2);
+ assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]);
+ assert_eq!(&merged.key_mappings[1], &[]);
+ }
+}
diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs
index c0d2026808..a0f4166651 100644
--- a/arrow-select/src/interleave.rs
+++ b/arrow-select/src/interleave.rs
@@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
+use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
+use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
+use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
-use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
+use arrow_buffer::{
+ ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer,
+};
use arrow_data::transform::MutableArrayData;
-use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType};
use std::sync::Arc;
@@ -30,6 +33,12 @@ macro_rules! primitive_helper {
};
}
+macro_rules! dict_helper {
+ ($t:ty, $values:expr, $indices:expr) => {
+ Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
+ };
+}
+
///
/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
///
@@ -87,6 +96,10 @@ pub fn interleave(
DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
+ DataType::Dictionary(k, _) => downcast_integer! {
+ k.as_ref() => (dict_helper, values, indices),
+ _ => unreachable!("illegal dictionary key type {k}")
+ },
_ => interleave_fallback(values, indices)
}
}
@@ -97,10 +110,8 @@ pub fn interleave(
struct Interleave<'a, T> {
/// The input arrays downcast to T
arrays: Vec<&'a T>,
- /// The number of nulls in the interleaved output
- null_count: usize,
/// The null buffer of the interleaved output
- nulls: Option<Buffer>,
+ nulls: Option<NullBuffer>,
}
impl<'a, T: Array + 'static> Interleave<'a, T> {
@@ -114,22 +125,19 @@ impl<'a, T: Array + 'static> Interleave<'a, T> {
})
.collect();
- let mut null_count = 0;
- let nulls = has_nulls.then(|| {
- let mut builder = BooleanBufferBuilder::new(indices.len());
- for (a, b) in indices {
- let v = arrays[*a].is_valid(*b);
- null_count += !v as usize;
- builder.append(v)
+ let nulls = match has_nulls {
+ true => {
+ let mut builder = NullBufferBuilder::new(indices.len());
+ for (a, b) in indices {
+ let v = arrays[*a].is_valid(*b);
+ builder.append(v)
+ }
+ builder.finish()
}
- builder.into()
- });
+ false => None,
+ };
- Self {
- arrays,
- null_count,
- nulls,
- }
+ Self { arrays, nulls }
}
}
@@ -140,20 +148,14 @@ fn interleave_primitive<T: ArrowPrimitiveType>(
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
- let mut values = BufferBuilder::<T::Native>::new(indices.len());
+ let mut values = Vec::with_capacity(indices.len());
for (a, b) in indices {
let v = interleaved.arrays[*a].value(*b);
- values.append(v)
+ values.push(v)
}
- let builder = ArrayDataBuilder::new(data_type.clone())
- .len(indices.len())
- .add_buffer(values.finish())
- .null_bit_buffer(interleaved.nulls)
- .null_count(interleaved.null_count);
-
- let data = unsafe { builder.build_unchecked() };
- Ok(Arc::new(PrimitiveArray::<T>::from(data)))
+ let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
+ Ok(Arc::new(array.with_data_type(data_type.clone())))
}
fn interleave_bytes<T: ByteArrayType>(
@@ -177,15 +179,55 @@ fn interleave_bytes<T: ByteArrayType>(
values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
}
- let builder = ArrayDataBuilder::new(T::DATA_TYPE)
- .len(indices.len())
- .add_buffer(offsets.finish())
- .add_buffer(values.into())
- .null_bit_buffer(interleaved.nulls)
- .null_count(interleaved.null_count);
+ // Safety: safe by construction
+ let array = unsafe {
+ let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
+ GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
+ };
+ Ok(Arc::new(array))
+}
+
+fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
+ arrays: &[&dyn Array],
+ indices: &[(usize, usize)],
+) -> Result<ArrayRef, ArrowError> {
+ let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
+ if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
+ return interleave_fallback(arrays, indices);
+ }
+
+ let masks: Vec<_> = dictionaries
+ .iter()
+ .enumerate()
+ .map(|(a_idx, dictionary)| {
+ let mut key_mask = BooleanBufferBuilder::new_from_buffer(
+ MutableBuffer::new_null(dictionary.len()),
+ dictionary.len(),
+ );
+
+ for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
+ key_mask.set_bit(*key_idx, true);
+ }
+ key_mask.finish()
+ })
+ .collect();
+
+ let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
- let data = unsafe { builder.build_unchecked() };
- Ok(Arc::new(GenericByteArray::<T>::from(data)))
+ // Recompute keys
+ let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
+ for (a, b) in indices {
+ let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
+ match old_keys.is_valid(*b) {
+ true => {
+ let old_key = old_keys.values()[*b];
+ keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
+ }
+ false => keys.append_null(),
+ }
+ }
+ let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
+ Ok(Arc::new(array))
}
/// Fallback implementation of interleave using [`MutableArrayData`]
@@ -280,6 +322,32 @@ mod tests {
)
}
+ #[test]
+ fn test_interleave_dictionary() {
+ let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
+ let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
+
+ // Should not recompute dictionary
+ let values =
+ interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)])
+ .unwrap();
+ let v = values.as_dictionary::<Int32Type>();
+ assert_eq!(v.values().len(), 5);
+
+ let vc = v.downcast_dict::<StringArray>().unwrap();
+ let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
+ assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
+
+ // Should recompute dictionary
+ let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
+ let v = values.as_dictionary::<Int32Type>();
+ assert_eq!(v.values().len(), 1);
+
+ let vc = v.downcast_dict::<StringArray>().unwrap();
+ let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
+ assert_eq!(&collected, &["c", "c", "c"]);
+ }
+
#[test]
fn test_lists() {
// [[1, 2], null, [3]]
@@ -323,4 +391,25 @@ mod tests {
assert_eq!(v, &expected);
}
+
+ #[test]
+ fn interleave_sparse_nulls() {
+ let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
+ let keys = Int32Array::from_iter_values(0..10);
+ let dict_a = DictionaryArray::new(keys, Arc::new(values));
+ let values = StringArray::new_null(0);
+ let keys = Int32Array::new_null(10);
+ let dict_b = DictionaryArray::new(keys, Arc::new(values));
+
+ let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
+ let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
+
+ let expected = DictionaryArray::<Int32Type>::from_iter(vec![
+ Some("0"),
+ Some("1"),
+ Some("2"),
+ None,
+ ]);
+ assert_eq!(array.as_ref(), &expected)
+ }
}
diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs
index c468e20a51..82f57a6af4 100644
--- a/arrow-select/src/lib.rs
+++ b/arrow-select/src/lib.rs
@@ -18,6 +18,7 @@
//! Arrow selection kernels
pub mod concat;
+mod dictionary;
pub mod filter;
pub mod interleave;
pub mod nullif;
diff --git a/arrow/benches/concatenate_kernel.rs b/arrow/benches/concatenate_kernel.rs
index 3fff2abd17..2f5b654394 100644
--- a/arrow/benches/concatenate_kernel.rs
+++ b/arrow/benches/concatenate_kernel.rs
@@ -60,6 +60,28 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("concat str nulls 1024", |b| {
b.iter(|| bench_concat(&v1, &v2))
});
+
+ let v1 = create_string_array_with_len::<i32>(10, 0.0, 20);
+ let v1 = create_dict_from_values::<Int32Type>(1024, 0.0, &v1);
+ let v2 = create_string_array_with_len::<i32>(10, 0.0, 20);
+ let v2 = create_dict_from_values::<Int32Type>(1024, 0.0, &v2);
+ c.bench_function("concat str_dict 1024", |b| {
+ b.iter(|| bench_concat(&v1, &v2))
+ });
+
+ let v1 = create_string_array_with_len::<i32>(1024, 0.0, 20);
+ let v1 = create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &v1, 10..20);
+ let v2 = create_string_array_with_len::<i32>(1024, 0.0, 20);
+ let v2 = create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &v2, 30..40);
+ c.bench_function("concat str_dict_sparse 1024", |b| {
+ b.iter(|| bench_concat(&v1, &v2))
+ });
+
+ let v1 = create_string_array::<i32>(1024, 0.5);
+ let v2 = create_string_array::<i32>(1024, 0.5);
+ c.bench_function("concat str nulls 1024", |b| {
+ b.iter(|| bench_concat(&v1, &v2))
+ });
}
criterion_group!(benches, add_benchmark);
diff --git a/arrow/benches/interleave_kernels.rs b/arrow/benches/interleave_kernels.rs
index 2bb430e40b..454d914080 100644
--- a/arrow/benches/interleave_kernels.rs
+++ b/arrow/benches/interleave_kernels.rs
@@ -37,14 +37,21 @@ fn do_bench(
base: &dyn Array,
slices: &[Range<usize>],
) {
- let mut rng = seedable_rng();
-
let arrays: Vec<_> = slices
.iter()
.map(|r| base.slice(r.start, r.end - r.start))
.collect();
let values: Vec<_> = arrays.iter().map(|x| x.as_ref()).collect();
+ bench_values(
+ c,
+ &format!("interleave {prefix} {len} {slices:?}"),
+ len,
+ &values,
+ );
+}
+fn bench_values(c: &mut Criterion, name: &str, len: usize, values: &[&dyn Array]) {
+ let mut rng = seedable_rng();
let indices: Vec<_> = (0..len)
.map(|_| {
let array_idx = rng.gen_range(0..values.len());
@@ -53,8 +60,8 @@ fn do_bench(
})
.collect();
- c.bench_function(&format!("interleave {prefix} {len} {slices:?}"), |b| {
- b.iter(|| criterion::black_box(interleave(&values, &indices).unwrap()))
+ c.bench_function(name, |b| {
+ b.iter(|| criterion::black_box(interleave(values, &indices).unwrap()))
});
}
@@ -63,12 +70,20 @@ fn add_benchmark(c: &mut Criterion) {
let i32_opt = create_primitive_array::<Int32Type>(1024, 0.5);
let string = create_string_array_with_len::<i32>(1024, 0., 20);
let string_opt = create_string_array_with_len::<i32>(1024, 0.5, 20);
+ let values = create_string_array_with_len::<i32>(10, 0.0, 20);
+ let dict = create_dict_from_values::<Int32Type>(1024, 0.0, &values);
+
+ let values = create_string_array_with_len::<i32>(1024, 0.0, 20);
+ let sparse_dict =
+ create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &values, 10..20);
let cases: &[(&str, &dyn Array)] = &[
("i32(0.0)", &i32),
("i32(0.5)", &i32_opt),
("str(20, 0.0)", &string),
("str(20, 0.5)", &string_opt),
+ ("dict(20, 0.0)", &dict),
+ ("dict_sparse(20, 0.0)", &sparse_dict),
];
for (prefix, base) in cases {
@@ -83,6 +98,15 @@ fn add_benchmark(c: &mut Criterion) {
do_bench(c, prefix, *len, *base, slice);
}
}
+
+ for len in [100, 1024, 2048] {
+ bench_values(
+ c,
+ &format!("interleave dict_distinct {len}"),
+ 100,
+ &[&dict, &sparse_dict],
+ );
+ }
}
criterion_group!(benches, add_benchmark);
diff --git a/arrow/src/util/bench_util.rs b/arrow/src/util/bench_util.rs
index 9bdc247837..5e5f4c6ee1 100644
--- a/arrow/src/util/bench_util.rs
+++ b/arrow/src/util/bench_util.rs
@@ -29,6 +29,7 @@ use rand::{
distributions::{Alphanumeric, Distribution, Standard},
prelude::StdRng,
};
+use std::ops::Range;
/// Creates an random (but fixed-seeded) array of a given size and null density
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
@@ -268,6 +269,24 @@ pub fn create_dict_from_values<K>(
null_density: f32,
values: &dyn Array,
) -> DictionaryArray<K>
+where
+ K: ArrowDictionaryKeyType,
+ Standard: Distribution<K::Native>,
+ K::Native: SampleUniform,
+{
+ let min_key = K::Native::from_usize(0).unwrap();
+ let max_key = K::Native::from_usize(values.len()).unwrap();
+ create_sparse_dict_from_values(size, null_density, values, min_key..max_key)
+}
+
+/// Creates a random (but fixed-seeded) dictionary array of a given size and null density
+/// with the provided values array and key range
+pub fn create_sparse_dict_from_values<K>(
+ size: usize,
+ null_density: f32,
+ values: &dyn Array,
+ key_range: Range<K::Native>,
+) -> DictionaryArray<K>
where
K: ArrowDictionaryKeyType,
Standard: Distribution<K::Native>,
@@ -279,9 +298,9 @@ where
Box::new(values.data_type().clone()),
);
- let min_key = K::Native::from_usize(0).unwrap();
- let max_key = K::Native::from_usize(values.len()).unwrap();
- let keys: Buffer = (0..size).map(|_| rng.gen_range(min_key..max_key)).collect();
+ let keys: Buffer = (0..size)
+ .map(|_| rng.gen_range(key_range.clone()))
+ .collect();
let nulls: Option<Buffer> = (null_density != 0.)
.then(|| (0..size).map(|_| rng.gen_bool(null_density as _)).collect());