You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/09/05 15:02:52 UTC

[arrow-rs] branch master updated: Re-encode dictionaries in selection kernels (#3558)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 65edbb1702 Re-encode dictionaries in selection kernels (#3558)
65edbb1702 is described below

commit 65edbb1702e25420aacebe656dcd789690f72c82
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Sep 5 16:02:46 2023 +0100

    Re-encode dictionaries in selection kernels (#3558)
    
    * Re-encode dictionaries in selection kernels
    
    * More benchmarks
    
    * Best-effort hashing
    
    * More benchmarks
    
    * Add fallback to concatenating dictionaries
    
    * Fix nulls
    
    * Format
    
    * Cleanup
    
    * RAT
    
    * Clippy
    
    * Split out heuristic
    
    * Add support to interleave kernel
    
    * Clippy
    
    * More clippy
    
    * Clippy
    
    * Cleanup
    
    * Optimize concat
    
    * Review feedback
    
    * Clippy
    
    * Improved null handling
    
    * Further tests
    
    * Faster ptr_eq
---
 arrow-buffer/src/buffer/immutable.rs |   8 +
 arrow-buffer/src/buffer/offset.rs    |   8 +
 arrow-buffer/src/buffer/scalar.rs    |   8 +
 arrow-select/Cargo.toml              |   1 +
 arrow-select/src/concat.rs           | 186 ++++++++++++++-----
 arrow-select/src/dictionary.rs       | 333 +++++++++++++++++++++++++++++++++++
 arrow-select/src/interleave.rs       | 165 +++++++++++++----
 arrow-select/src/lib.rs              |   1 +
 arrow/benches/concatenate_kernel.rs  |  22 +++
 arrow/benches/interleave_kernels.rs  |  32 +++-
 arrow/src/util/bench_util.rs         |  25 ++-
 11 files changed, 698 insertions(+), 91 deletions(-)

diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index 8296d3fbcc..bda6dfc5cd 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -323,6 +323,14 @@ impl Buffer {
                 length,
             })
     }
+
+    /// Returns true if this [`Buffer`] is equal to `other`, using pointer comparisons
+    /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+    /// return false when the arrays are logically equal
+    #[inline]
+    pub fn ptr_eq(&self, other: &Self) -> bool {
+        self.ptr == other.ptr && self.length == other.length
+    }
 }
 
 /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs
index fede32c579..a6f2f7f6cf 100644
--- a/arrow-buffer/src/buffer/offset.rs
+++ b/arrow-buffer/src/buffer/offset.rs
@@ -148,6 +148,14 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
     pub fn slice(&self, offset: usize, len: usize) -> Self {
         Self(self.0.slice(offset, len.saturating_add(1)))
     }
+
+    /// Returns true if this [`OffsetBuffer`] is equal to `other`, using pointer comparisons
+    /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+    /// return false when the arrays are logically equal
+    #[inline]
+    pub fn ptr_eq(&self, other: &Self) -> bool {
+        self.0.ptr_eq(&other.0)
+    }
 }
 
 impl<T: ArrowNativeType> Deref for OffsetBuffer<T> {
diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs
index 70c86f1186..276e635e82 100644
--- a/arrow-buffer/src/buffer/scalar.rs
+++ b/arrow-buffer/src/buffer/scalar.rs
@@ -86,6 +86,14 @@ impl<T: ArrowNativeType> ScalarBuffer<T> {
     pub fn into_inner(self) -> Buffer {
         self.buffer
     }
+
+    /// Returns true if this [`ScalarBuffer`] is equal to `other`, using pointer comparisons
+    /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
+    /// return false when the arrays are logically equal
+    #[inline]
+    pub fn ptr_eq(&self, other: &Self) -> bool {
+        self.buffer.ptr_eq(&other.buffer)
+    }
 }
 
 impl<T: ArrowNativeType> Deref for ScalarBuffer<T> {
diff --git a/arrow-select/Cargo.toml b/arrow-select/Cargo.toml
index ff8a212c7b..023788799c 100644
--- a/arrow-select/Cargo.toml
+++ b/arrow-select/Cargo.toml
@@ -39,6 +39,7 @@ arrow-data = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-array = { workspace = true }
 num = { version = "0.4", default-features = false, features = ["std"] }
+ahash = { version = "0.8", default-features = false}
 
 [features]
 default = []
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index eed20699c2..c34c3d3d0c 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -30,20 +30,20 @@
 //! assert_eq!(arr.len(), 3);
 //! ```
 
+use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
+use arrow_array::cast::AsArray;
 use arrow_array::types::*;
 use arrow_array::*;
-use arrow_buffer::ArrowNativeType;
+use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer};
 use arrow_data::transform::{Capacities, MutableArrayData};
 use arrow_schema::{ArrowError, DataType, SchemaRef};
+use std::sync::Arc;
 
 fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
     let mut item_capacity = 0;
     let mut bytes_capacity = 0;
     for array in arrays {
-        let a = array
-            .as_any()
-            .downcast_ref::<GenericByteArray<T>>()
-            .unwrap();
+        let a = array.as_bytes::<T>();
 
         // Guaranteed to always have at least one element
         let offsets = a.value_offsets();
@@ -54,6 +54,59 @@ fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
     Capacities::Binary(item_capacity, Some(bytes_capacity))
 }
 
+fn concat_dictionaries<K: ArrowDictionaryKeyType>(
+    arrays: &[&dyn Array],
+) -> Result<ArrayRef, ArrowError> {
+    let mut output_len = 0;
+    let dictionaries: Vec<_> = arrays
+        .iter()
+        .map(|x| x.as_dictionary::<K>())
+        .inspect(|d| output_len += d.len())
+        .collect();
+
+    if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
+        return concat_fallback(arrays, Capacities::Array(output_len));
+    }
+
+    let merged = merge_dictionary_values(&dictionaries, None)?;
+
+    // Recompute keys
+    let mut key_values = Vec::with_capacity(output_len);
+
+    let mut has_nulls = false;
+    for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
+        has_nulls |= d.null_count() != 0;
+        for key in d.keys().values() {
+            // Use get to safely handle nulls
+            key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
+        }
+    }
+
+    let nulls = has_nulls.then(|| {
+        let mut nulls = BooleanBufferBuilder::new(output_len);
+        for d in &dictionaries {
+            match d.nulls() {
+                Some(n) => nulls.append_buffer(n.inner()),
+                None => nulls.append_n(d.len(), true),
+            }
+        }
+        NullBuffer::new(nulls.finish())
+    });
+
+    let keys = PrimitiveArray::<K>::new(key_values.into(), nulls);
+    // Sanity check
+    assert_eq!(keys.len(), output_len);
+
+    let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
+    Ok(Arc::new(array))
+}
+
+macro_rules! dict_helper {
+    ($t:ty, $arrays:expr) => {
+        return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
+    };
+}
+
 /// Concatenate multiple [Array] of the same type into a single [ArrayRef].
 pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
     if arrays.is_empty() {
@@ -78,9 +131,23 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
         DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
         DataType::Binary => binary_capacity::<BinaryType>(arrays),
         DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
+        DataType::Dictionary(k, _) => downcast_integer! {
+            k.as_ref() => (dict_helper, arrays),
+            _ => unreachable!("illegal dictionary key type {k}")
+        },
         _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
     };
 
+    concat_fallback(arrays, capacity)
+}
+
+/// Concatenates arrays using MutableArrayData
+///
+/// This will naively concatenate dictionaries
+fn concat_fallback(
+    arrays: &[&dyn Array],
+    capacity: Capacities,
+) -> Result<ArrayRef, ArrowError> {
     let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
     let array_data = array_data.iter().collect();
     let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
@@ -140,6 +207,7 @@ pub fn concat_batches<'a>(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use arrow_array::builder::StringDictionaryBuilder;
     use arrow_array::cast::AsArray;
     use arrow_schema::{Field, Schema};
     use std::sync::Arc;
@@ -468,29 +536,10 @@ mod tests {
     }
 
     fn collect_string_dictionary(
-        dictionary: &DictionaryArray<Int32Type>,
-    ) -> Vec<Option<String>> {
-        let values = dictionary.values();
-        let values = values.as_any().downcast_ref::<StringArray>().unwrap();
-
-        dictionary
-            .keys()
-            .iter()
-            .map(|key| key.map(|key| values.value(key as _).to_string()))
-            .collect()
-    }
-
-    fn concat_dictionary(
-        input_1: DictionaryArray<Int32Type>,
-        input_2: DictionaryArray<Int32Type>,
-    ) -> Vec<Option<String>> {
-        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
-        let concat = concat
-            .as_any()
-            .downcast_ref::<DictionaryArray<Int32Type>>()
-            .unwrap();
-
-        collect_string_dictionary(concat)
+        array: &DictionaryArray<Int32Type>,
+    ) -> Vec<Option<&str>> {
+        let concrete = array.downcast_dict::<StringArray>().unwrap();
+        concrete.into_iter().collect()
     }
 
     #[test]
@@ -509,11 +558,19 @@ mod tests {
             "E",
         ]
         .into_iter()
-        .map(|x| Some(x.to_string()))
+        .map(Some)
         .collect();
 
-        let concat = concat_dictionary(input_1, input_2);
-        assert_eq!(concat, expected);
+        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+        let dictionary = concat.as_dictionary::<Int32Type>();
+        let actual = collect_string_dictionary(dictionary);
+        assert_eq!(actual, expected);
+
+        // Should have concatenated inputs together
+        assert_eq!(
+            dictionary.values().len(),
+            input_1.values().len() + input_2.values().len(),
+        )
     }
 
     #[test]
@@ -523,16 +580,45 @@ mod tests {
                 .into_iter()
                 .collect();
         let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
-        let expected = vec![
-            Some("foo".to_string()),
-            Some("bar".to_string()),
-            None,
-            Some("fiz".to_string()),
-            None,
-        ];
+        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
 
-        let concat = concat_dictionary(input_1, input_2);
-        assert_eq!(concat, expected);
+        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+        let dictionary = concat.as_dictionary::<Int32Type>();
+        let actual = collect_string_dictionary(dictionary);
+        assert_eq!(actual, expected);
+
+        // Should have concatenated inputs together
+        assert_eq!(
+            dictionary.values().len(),
+            input_1.values().len() + input_2.values().len(),
+        )
+    }
+
+    #[test]
+    fn test_string_dictionary_merge() {
+        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
+        for i in 0..20 {
+            builder.append(&i.to_string()).unwrap();
+        }
+        let input_1 = builder.finish();
+
+        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
+        for i in 0..30 {
+            builder.append(&i.to_string()).unwrap();
+        }
+        let input_2 = builder.finish();
+
+        let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
+        let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
+
+        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
+        let dictionary = concat.as_dictionary::<Int32Type>();
+        let actual = collect_string_dictionary(dictionary);
+        assert_eq!(actual, expected);
+
+        // Should have merged inputs together
+        // Not 30 as this is done on a best-effort basis
+        assert_eq!(dictionary.values().len(), 33)
     }
 
     #[test]
@@ -556,7 +642,7 @@ mod tests {
     fn test_dictionary_concat_reuse() {
         let array: DictionaryArray<Int8Type> =
             vec!["a", "a", "b", "c"].into_iter().collect();
-        let copy: DictionaryArray<Int8Type> = array.to_data().into();
+        let copy: DictionaryArray<Int8Type> = array.clone();
 
         // dictionary is "a", "b", "c"
         assert_eq!(
@@ -567,11 +653,7 @@ mod tests {
 
         // concatenate it with itself
         let combined = concat(&[&copy as _, &array as _]).unwrap();
-
-        let combined = combined
-            .as_any()
-            .downcast_ref::<DictionaryArray<Int8Type>>()
-            .unwrap();
+        let combined = combined.as_dictionary::<Int8Type>();
 
         assert_eq!(
             combined.values(),
@@ -738,4 +820,16 @@ mod tests {
         assert_eq!(data.buffers()[1].len(), 200);
         assert_eq!(data.buffers()[1].capacity(), 256); // Nearest multiple of 64
     }
+
+    #[test]
+    fn concat_sparse_nulls() {
+        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
+        let keys = Int32Array::from(vec![1; 10]);
+        let dict_a = DictionaryArray::new(keys, Arc::new(values));
+        let values = StringArray::new_null(0);
+        let keys = Int32Array::new_null(10);
+        let dict_b = DictionaryArray::new(keys, Arc::new(values));
+        let array = concat(&[&dict_a, &dict_b]).unwrap();
+        assert_eq!(array.null_count(), 10);
+    }
 }
diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs
new file mode 100644
index 0000000000..8630b332f0
--- /dev/null
+++ b/arrow-select/src/dictionary.rs
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::interleave::interleave;
+use ahash::RandomState;
+use arrow_array::builder::BooleanBufferBuilder;
+use arrow_array::cast::AsArray;
+use arrow_array::types::{
+    ArrowDictionaryKeyType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type,
+    Utf8Type,
+};
+use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray};
+use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer};
+use arrow_schema::{ArrowError, DataType};
+
+/// A best effort interner that maintains a fixed number of buckets
+/// and interns keys based on their hash value
+///
+/// Hash collisions will result in replacement
+struct Interner<'a, V> {
+    state: RandomState,
+    buckets: Vec<Option<(&'a [u8], V)>>,
+    shift: u32,
+}
+
+impl<'a, V> Interner<'a, V> {
+    /// Capacity controls the number of unique buckets allocated within the Interner
+    ///
+    /// A larger capacity reduces the probability of hash collisions, and should be set
+    /// based on an approximation of the upper bound of unique values
+    fn new(capacity: usize) -> Self {
+        // Add additional buckets to help reduce collisions
+        let shift = (capacity as u64 + 128).leading_zeros();
+        let num_buckets = (u64::MAX >> shift) as usize;
+        let buckets = (0..num_buckets.saturating_add(1)).map(|_| None).collect();
+        Self {
+            // A fixed seed to ensure deterministic behaviour
+            state: RandomState::with_seeds(0, 0, 0, 0),
+            buckets,
+            shift,
+        }
+    }
+
+    fn intern<F: FnOnce() -> Result<V, E>, E>(
+        &mut self,
+        new: &'a [u8],
+        f: F,
+    ) -> Result<&V, E> {
+        let hash = self.state.hash_one(new);
+        let bucket_idx = hash >> self.shift;
+        Ok(match &mut self.buckets[bucket_idx as usize] {
+            Some((current, v)) => {
+                if *current != new {
+                    *v = f()?;
+                    *current = new;
+                }
+                v
+            }
+            slot => &slot.insert((new, f()?)).1,
+        })
+    }
+}
+
+pub struct MergedDictionaries<K: ArrowDictionaryKeyType> {
+    /// Provides `key_mappings[`array_idx`][`old_key`] -> new_key`
+    pub key_mappings: Vec<Vec<K::Native>>,
+    /// The new values
+    pub values: ArrayRef,
+}
+
+/// Performs a cheap, pointer-based comparison of two byte array
+///
+/// See [`ScalarBuffer::ptr_eq`]
+fn bytes_ptr_eq<T: ByteArrayType>(a: &dyn Array, b: &dyn Array) -> bool {
+    match (a.as_bytes_opt::<T>(), b.as_bytes_opt::<T>()) {
+        (Some(a), Some(b)) => {
+            let values_eq =
+                a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets());
+            match (a.nulls(), b.nulls()) {
+                (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()),
+                (None, None) => values_eq,
+                _ => false,
+            }
+        }
+        _ => false,
+    }
+}
+
+/// A type-erased function that compares two array for pointer equality
+type PtrEq = dyn Fn(&dyn Array, &dyn Array) -> bool;
+
+/// A weak heuristic of whether to merge dictionary values that aims to only
+/// perform the expensive merge computation when it is likely to yield at least
+/// some return over the naive approach used by MutableArrayData
+///
+/// `len` is the total length of the merged output
+pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
+    dictionaries: &[&DictionaryArray<K>],
+    len: usize,
+) -> bool {
+    use DataType::*;
+    let first_values = dictionaries[0].values().as_ref();
+    let ptr_eq: Box<PtrEq> = match first_values.data_type() {
+        Utf8 => Box::new(bytes_ptr_eq::<Utf8Type>),
+        LargeUtf8 => Box::new(bytes_ptr_eq::<LargeUtf8Type>),
+        Binary => Box::new(bytes_ptr_eq::<BinaryType>),
+        LargeBinary => Box::new(bytes_ptr_eq::<LargeBinaryType>),
+        _ => return false,
+    };
+
+    let mut single_dictionary = true;
+    let mut total_values = first_values.len();
+    for dict in dictionaries.iter().skip(1) {
+        let values = dict.values().as_ref();
+        total_values += values.len();
+        if single_dictionary {
+            single_dictionary = ptr_eq(first_values, values)
+        }
+    }
+
+    let overflow = K::Native::from_usize(total_values).is_none();
+    let values_exceed_length = total_values >= len;
+
+    !single_dictionary && (overflow || values_exceed_length)
+}
+
+/// Given an array of dictionaries and an optional key mask compute a values array
+/// containing referenced values, along with mappings from the [`DictionaryArray`]
+/// keys to the new keys within this values array. Best-effort will be made to ensure
+/// that the dictionary values are unique
+///
+/// This method is meant to be very fast and the output dictionary values
+/// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower
+/// but produces unique values
+pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
+    dictionaries: &[&DictionaryArray<K>],
+    masks: Option<&[BooleanBuffer]>,
+) -> Result<MergedDictionaries<K>, ArrowError> {
+    let mut num_values = 0;
+
+    let mut values = Vec::with_capacity(dictionaries.len());
+    let mut value_slices = Vec::with_capacity(dictionaries.len());
+
+    for (idx, dictionary) in dictionaries.iter().enumerate() {
+        let mask = masks.and_then(|m| m.get(idx));
+        let key_mask = match (dictionary.logical_nulls(), mask) {
+            (Some(n), None) => Some(n.into_inner()),
+            (None, Some(n)) => Some(n.clone()),
+            (Some(n), Some(m)) => Some(n.inner() & m),
+            (None, None) => None,
+        };
+        let keys = dictionary.keys().values();
+        let values_mask = compute_values_mask(keys, key_mask.as_ref());
+        let v = dictionary.values().as_ref();
+        num_values += v.len();
+        value_slices.push(get_masked_values(v, &values_mask));
+        values.push(v)
+    }
+
+    // Map from value to new index
+    let mut interner = Interner::new(num_values);
+    // Interleave indices for new values array
+    let mut indices = Vec::with_capacity(num_values);
+
+    // Compute the mapping for each dictionary
+    let key_mappings = dictionaries
+        .iter()
+        .enumerate()
+        .zip(value_slices)
+        .map(|((dictionary_idx, dictionary), values)| {
+            let zero = K::Native::from_usize(0).unwrap();
+            let mut mapping = vec![zero; dictionary.values().len()];
+
+            for (value_idx, value) in values {
+                mapping[value_idx] = *interner.intern(value, || {
+                    match K::Native::from_usize(indices.len()) {
+                        Some(idx) => {
+                            indices.push((dictionary_idx, value_idx));
+                            Ok(idx)
+                        }
+                        None => Err(ArrowError::DictionaryKeyOverflowError),
+                    }
+                })?;
+            }
+            Ok(mapping)
+        })
+        .collect::<Result<Vec<_>, ArrowError>>()?;
+
+    Ok(MergedDictionaries {
+        key_mappings,
+        values: interleave(&values, &indices)?,
+    })
+}
+
+/// Return a mask identifying the values that are referenced by keys in `dictionary`
+/// at the positions indicated by `selection`
+fn compute_values_mask<K: ArrowNativeType>(
+    keys: &ScalarBuffer<K>,
+    mask: Option<&BooleanBuffer>,
+) -> BooleanBuffer {
+    let mut builder = BooleanBufferBuilder::new(keys.len());
+    builder.advance(keys.len());
+
+    match mask {
+        Some(n) => n
+            .set_indices()
+            .for_each(|idx| builder.set_bit(keys[idx].as_usize(), true)),
+        None => keys
+            .iter()
+            .for_each(|k| builder.set_bit(k.as_usize(), true)),
+    }
+    builder.finish()
+}
+
+/// Return a Vec containing for each set index in `mask`, the index and byte value of that index
+fn get_masked_values<'a>(
+    array: &'a dyn Array,
+    mask: &BooleanBuffer,
+) -> Vec<(usize, &'a [u8])> {
+    match array.data_type() {
+        DataType::Utf8 => masked_bytes(array.as_string::<i32>(), mask),
+        DataType::LargeUtf8 => masked_bytes(array.as_string::<i64>(), mask),
+        DataType::Binary => masked_bytes(array.as_binary::<i32>(), mask),
+        DataType::LargeBinary => masked_bytes(array.as_binary::<i64>(), mask),
+        _ => unimplemented!(),
+    }
+}
+
+/// Compute [`get_masked_values`] for a [`GenericByteArray`]
+///
+/// Note: this does not check the null mask and will return values contained in null slots
+fn masked_bytes<'a, T: ByteArrayType>(
+    array: &'a GenericByteArray<T>,
+    mask: &BooleanBuffer,
+) -> Vec<(usize, &'a [u8])> {
+    let mut out = Vec::with_capacity(mask.count_set_bits());
+    for idx in mask.set_indices() {
+        out.push((idx, array.value(idx).as_ref()))
+    }
+    out
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::dictionary::merge_dictionary_values;
+    use arrow_array::cast::as_string_array;
+    use arrow_array::types::Int32Type;
+    use arrow_array::{DictionaryArray, Int32Array, StringArray};
+    use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_merge_strings() {
+        let a =
+            DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]);
+        let b = DictionaryArray::<Int32Type>::from_iter(["c", "f", "c", "d", "a", "d"]);
+        let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
+
+        let values = as_string_array(merged.values.as_ref());
+        let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+        assert_eq!(&actual, &["a", "b", "d", "c", "e", "f"]);
+
+        assert_eq!(merged.key_mappings.len(), 2);
+        assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 3, 4]);
+        assert_eq!(&merged.key_mappings[1], &[3, 5, 2, 0]);
+
+        let a_slice = a.slice(1, 4);
+        let merged = merge_dictionary_values(&[&a_slice, &b], None).unwrap();
+
+        let values = as_string_array(merged.values.as_ref());
+        let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+        assert_eq!(&actual, &["a", "b", "d", "c", "f"]);
+
+        assert_eq!(merged.key_mappings.len(), 2);
+        assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 0, 0]);
+        assert_eq!(&merged.key_mappings[1], &[3, 4, 2, 0]);
+
+        // Mask out only ["b", "b", "d"] from a
+        let a_mask =
+            BooleanBuffer::from_iter([false, true, false, true, true, false, false]);
+        let b_mask = BooleanBuffer::new_set(b.len());
+        let merged = merge_dictionary_values(&[&a, &b], Some(&[a_mask, b_mask])).unwrap();
+
+        let values = as_string_array(merged.values.as_ref());
+        let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
+        assert_eq!(&actual, &["b", "d", "c", "f", "a"]);
+
+        assert_eq!(merged.key_mappings.len(), 2);
+        assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 0, 0]);
+        assert_eq!(&merged.key_mappings[1], &[2, 3, 1, 4]);
+    }
+
+    #[test]
+    fn test_merge_nulls() {
+        let buffer = Buffer::from("helloworldbingohelloworld");
+        let offsets = OffsetBuffer::from_lengths([5, 5, 5, 5, 5]);
+        let nulls = NullBuffer::from(vec![true, false, true, true, true]);
+        let values = StringArray::new(offsets, buffer, Some(nulls));
+
+        let key_values = vec![1, 2, 3, 1, 8, 2, 3];
+        let key_nulls =
+            NullBuffer::from(vec![true, true, false, true, false, true, true]);
+        let keys = Int32Array::new(key_values.into(), Some(key_nulls));
+        let a = DictionaryArray::new(keys, Arc::new(values));
+        // [NULL, "bingo", NULL, NULL, NULL, "bingo", "hello"]
+
+        let b = DictionaryArray::new(
+            Int32Array::new_null(10),
+            Arc::new(StringArray::new_null(0)),
+        );
+
+        let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
+        let expected = StringArray::from(vec!["bingo", "hello"]);
+        assert_eq!(merged.values.as_ref(), &expected);
+        assert_eq!(merged.key_mappings.len(), 2);
+        assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]);
+        assert_eq!(&merged.key_mappings[1], &[]);
+    }
+}
diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs
index c0d2026808..a0f4166651 100644
--- a/arrow-select/src/interleave.rs
+++ b/arrow-select/src/interleave.rs
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
+use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
+use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
+use arrow_array::cast::AsArray;
 use arrow_array::types::*;
 use arrow_array::*;
-use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
+use arrow_buffer::{
+    ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer,
+};
 use arrow_data::transform::MutableArrayData;
-use arrow_data::ArrayDataBuilder;
 use arrow_schema::{ArrowError, DataType};
 use std::sync::Arc;
 
@@ -30,6 +33,12 @@ macro_rules! primitive_helper {
     };
 }
 
+macro_rules! dict_helper {
+    ($t:ty, $values:expr, $indices:expr) => {
+        Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
+    };
+}
+
 ///
 /// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
 ///
@@ -87,6 +96,10 @@ pub fn interleave(
         DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
         DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
         DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
+        DataType::Dictionary(k, _) => downcast_integer! {
+            k.as_ref() => (dict_helper, values, indices),
+            _ => unreachable!("illegal dictionary key type {k}")
+        },
         _ => interleave_fallback(values, indices)
     }
 }
@@ -97,10 +110,8 @@ pub fn interleave(
 struct Interleave<'a, T> {
     /// The input arrays downcast to T
     arrays: Vec<&'a T>,
-    /// The number of nulls in the interleaved output
-    null_count: usize,
     /// The null buffer of the interleaved output
-    nulls: Option<Buffer>,
+    nulls: Option<NullBuffer>,
 }
 
 impl<'a, T: Array + 'static> Interleave<'a, T> {
@@ -114,22 +125,19 @@ impl<'a, T: Array + 'static> Interleave<'a, T> {
             })
             .collect();
 
-        let mut null_count = 0;
-        let nulls = has_nulls.then(|| {
-            let mut builder = BooleanBufferBuilder::new(indices.len());
-            for (a, b) in indices {
-                let v = arrays[*a].is_valid(*b);
-                null_count += !v as usize;
-                builder.append(v)
+        let nulls = match has_nulls {
+            true => {
+                let mut builder = NullBufferBuilder::new(indices.len());
+                for (a, b) in indices {
+                    let v = arrays[*a].is_valid(*b);
+                    builder.append(v)
+                }
+                builder.finish()
             }
-            builder.into()
-        });
+            false => None,
+        };
 
-        Self {
-            arrays,
-            null_count,
-            nulls,
-        }
+        Self { arrays, nulls }
     }
 }
 
@@ -140,20 +148,14 @@ fn interleave_primitive<T: ArrowPrimitiveType>(
 ) -> Result<ArrayRef, ArrowError> {
     let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
 
-    let mut values = BufferBuilder::<T::Native>::new(indices.len());
+    let mut values = Vec::with_capacity(indices.len());
     for (a, b) in indices {
         let v = interleaved.arrays[*a].value(*b);
-        values.append(v)
+        values.push(v)
     }
 
-    let builder = ArrayDataBuilder::new(data_type.clone())
-        .len(indices.len())
-        .add_buffer(values.finish())
-        .null_bit_buffer(interleaved.nulls)
-        .null_count(interleaved.null_count);
-
-    let data = unsafe { builder.build_unchecked() };
-    Ok(Arc::new(PrimitiveArray::<T>::from(data)))
+    let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
+    Ok(Arc::new(array.with_data_type(data_type.clone())))
 }
 
 fn interleave_bytes<T: ByteArrayType>(
@@ -177,15 +179,55 @@ fn interleave_bytes<T: ByteArrayType>(
         values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
     }
 
-    let builder = ArrayDataBuilder::new(T::DATA_TYPE)
-        .len(indices.len())
-        .add_buffer(offsets.finish())
-        .add_buffer(values.into())
-        .null_bit_buffer(interleaved.nulls)
-        .null_count(interleaved.null_count);
+    // Safety: safe by construction
+    let array = unsafe {
+        let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
+        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
+    };
+    Ok(Arc::new(array))
+}
+
+fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
+    arrays: &[&dyn Array],
+    indices: &[(usize, usize)],
+) -> Result<ArrayRef, ArrowError> {
+    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
+    if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
+        return interleave_fallback(arrays, indices);
+    }
+
+    let masks: Vec<_> = dictionaries
+        .iter()
+        .enumerate()
+        .map(|(a_idx, dictionary)| {
+            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
+                MutableBuffer::new_null(dictionary.len()),
+                dictionary.len(),
+            );
+
+            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
+                key_mask.set_bit(*key_idx, true);
+            }
+            key_mask.finish()
+        })
+        .collect();
+
+    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
 
-    let data = unsafe { builder.build_unchecked() };
-    Ok(Arc::new(GenericByteArray::<T>::from(data)))
+    // Recompute keys
+    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
+    for (a, b) in indices {
+        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
+        match old_keys.is_valid(*b) {
+            true => {
+                let old_key = old_keys.values()[*b];
+                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
+            }
+            false => keys.append_null(),
+        }
+    }
+    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
+    Ok(Arc::new(array))
 }
 
 /// Fallback implementation of interleave using [`MutableArrayData`]
@@ -280,6 +322,32 @@ mod tests {
         )
     }
 
+    #[test]
+    fn test_interleave_dictionary() {
+        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
+        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
+
+        // Should not recompute dictionary
+        let values =
+            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)])
+                .unwrap();
+        let v = values.as_dictionary::<Int32Type>();
+        assert_eq!(v.values().len(), 5);
+
+        let vc = v.downcast_dict::<StringArray>().unwrap();
+        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
+        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
+
+        // Should recompute dictionary
+        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
+        let v = values.as_dictionary::<Int32Type>();
+        assert_eq!(v.values().len(), 1);
+
+        let vc = v.downcast_dict::<StringArray>().unwrap();
+        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
+        assert_eq!(&collected, &["c", "c", "c"]);
+    }
+
     #[test]
     fn test_lists() {
         // [[1, 2], null, [3]]
@@ -323,4 +391,25 @@ mod tests {
 
         assert_eq!(v, &expected);
     }
+
+    #[test]
+    fn interleave_sparse_nulls() {
+        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
+        let keys = Int32Array::from_iter_values(0..10);
+        let dict_a = DictionaryArray::new(keys, Arc::new(values));
+        let values = StringArray::new_null(0);
+        let keys = Int32Array::new_null(10);
+        let dict_b = DictionaryArray::new(keys, Arc::new(values));
+
+        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
+        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
+
+        let expected = DictionaryArray::<Int32Type>::from_iter(vec![
+            Some("0"),
+            Some("1"),
+            Some("2"),
+            None,
+        ]);
+        assert_eq!(array.as_ref(), &expected)
+    }
 }
diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs
index c468e20a51..82f57a6af4 100644
--- a/arrow-select/src/lib.rs
+++ b/arrow-select/src/lib.rs
@@ -18,6 +18,7 @@
 //! Arrow selection kernels
 
 pub mod concat;
+mod dictionary;
 pub mod filter;
 pub mod interleave;
 pub mod nullif;
diff --git a/arrow/benches/concatenate_kernel.rs b/arrow/benches/concatenate_kernel.rs
index 3fff2abd17..2f5b654394 100644
--- a/arrow/benches/concatenate_kernel.rs
+++ b/arrow/benches/concatenate_kernel.rs
@@ -60,6 +60,28 @@ fn add_benchmark(c: &mut Criterion) {
     c.bench_function("concat str nulls 1024", |b| {
         b.iter(|| bench_concat(&v1, &v2))
     });
+
+    let v1 = create_string_array_with_len::<i32>(10, 0.0, 20);
+    let v1 = create_dict_from_values::<Int32Type>(1024, 0.0, &v1);
+    let v2 = create_string_array_with_len::<i32>(10, 0.0, 20);
+    let v2 = create_dict_from_values::<Int32Type>(1024, 0.0, &v2);
+    c.bench_function("concat str_dict 1024", |b| {
+        b.iter(|| bench_concat(&v1, &v2))
+    });
+
+    let v1 = create_string_array_with_len::<i32>(1024, 0.0, 20);
+    let v1 = create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &v1, 10..20);
+    let v2 = create_string_array_with_len::<i32>(1024, 0.0, 20);
+    let v2 = create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &v2, 30..40);
+    c.bench_function("concat str_dict_sparse 1024", |b| {
+        b.iter(|| bench_concat(&v1, &v2))
+    });
+
+    let v1 = create_string_array::<i32>(1024, 0.5);
+    let v2 = create_string_array::<i32>(1024, 0.5);
+    c.bench_function("concat str nulls 1024", |b| {
+        b.iter(|| bench_concat(&v1, &v2))
+    });
 }
 
 criterion_group!(benches, add_benchmark);
diff --git a/arrow/benches/interleave_kernels.rs b/arrow/benches/interleave_kernels.rs
index 2bb430e40b..454d914080 100644
--- a/arrow/benches/interleave_kernels.rs
+++ b/arrow/benches/interleave_kernels.rs
@@ -37,14 +37,21 @@ fn do_bench(
     base: &dyn Array,
     slices: &[Range<usize>],
 ) {
-    let mut rng = seedable_rng();
-
     let arrays: Vec<_> = slices
         .iter()
         .map(|r| base.slice(r.start, r.end - r.start))
         .collect();
     let values: Vec<_> = arrays.iter().map(|x| x.as_ref()).collect();
+    bench_values(
+        c,
+        &format!("interleave {prefix} {len} {slices:?}"),
+        len,
+        &values,
+    );
+}
 
+fn bench_values(c: &mut Criterion, name: &str, len: usize, values: &[&dyn Array]) {
+    let mut rng = seedable_rng();
     let indices: Vec<_> = (0..len)
         .map(|_| {
             let array_idx = rng.gen_range(0..values.len());
@@ -53,8 +60,8 @@ fn do_bench(
         })
         .collect();
 
-    c.bench_function(&format!("interleave {prefix} {len} {slices:?}"), |b| {
-        b.iter(|| criterion::black_box(interleave(&values, &indices).unwrap()))
+    c.bench_function(name, |b| {
+        b.iter(|| criterion::black_box(interleave(values, &indices).unwrap()))
     });
 }
 
@@ -63,12 +70,20 @@ fn add_benchmark(c: &mut Criterion) {
     let i32_opt = create_primitive_array::<Int32Type>(1024, 0.5);
     let string = create_string_array_with_len::<i32>(1024, 0., 20);
     let string_opt = create_string_array_with_len::<i32>(1024, 0.5, 20);
+    let values = create_string_array_with_len::<i32>(10, 0.0, 20);
+    let dict = create_dict_from_values::<Int32Type>(1024, 0.0, &values);
+
+    let values = create_string_array_with_len::<i32>(1024, 0.0, 20);
+    let sparse_dict =
+        create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &values, 10..20);
 
     let cases: &[(&str, &dyn Array)] = &[
         ("i32(0.0)", &i32),
         ("i32(0.5)", &i32_opt),
         ("str(20, 0.0)", &string),
         ("str(20, 0.5)", &string_opt),
+        ("dict(20, 0.0)", &dict),
+        ("dict_sparse(20, 0.0)", &sparse_dict),
     ];
 
     for (prefix, base) in cases {
@@ -83,6 +98,15 @@ fn add_benchmark(c: &mut Criterion) {
             do_bench(c, prefix, *len, *base, slice);
         }
     }
+
+    for len in [100, 1024, 2048] {
+        bench_values(
+            c,
+            &format!("interleave dict_distinct {len}"),
+            100,
+            &[&dict, &sparse_dict],
+        );
+    }
 }
 
 criterion_group!(benches, add_benchmark);
diff --git a/arrow/src/util/bench_util.rs b/arrow/src/util/bench_util.rs
index 9bdc247837..5e5f4c6ee1 100644
--- a/arrow/src/util/bench_util.rs
+++ b/arrow/src/util/bench_util.rs
@@ -29,6 +29,7 @@ use rand::{
     distributions::{Alphanumeric, Distribution, Standard},
     prelude::StdRng,
 };
+use std::ops::Range;
 
 /// Creates an random (but fixed-seeded) array of a given size and null density
 pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
@@ -268,6 +269,24 @@ pub fn create_dict_from_values<K>(
     null_density: f32,
     values: &dyn Array,
 ) -> DictionaryArray<K>
+where
+    K: ArrowDictionaryKeyType,
+    Standard: Distribution<K::Native>,
+    K::Native: SampleUniform,
+{
+    let min_key = K::Native::from_usize(0).unwrap();
+    let max_key = K::Native::from_usize(values.len()).unwrap();
+    create_sparse_dict_from_values(size, null_density, values, min_key..max_key)
+}
+
+/// Creates a random (but fixed-seeded) dictionary array of a given size and null density
+/// with the provided values array and key range
+pub fn create_sparse_dict_from_values<K>(
+    size: usize,
+    null_density: f32,
+    values: &dyn Array,
+    key_range: Range<K::Native>,
+) -> DictionaryArray<K>
 where
     K: ArrowDictionaryKeyType,
     Standard: Distribution<K::Native>,
@@ -279,9 +298,9 @@ where
         Box::new(values.data_type().clone()),
     );
 
-    let min_key = K::Native::from_usize(0).unwrap();
-    let max_key = K::Native::from_usize(values.len()).unwrap();
-    let keys: Buffer = (0..size).map(|_| rng.gen_range(min_key..max_key)).collect();
+    let keys: Buffer = (0..size)
+        .map(|_| rng.gen_range(key_range.clone()))
+        .collect();
 
     let nulls: Option<Buffer> = (null_density != 0.)
         .then(|| (0..size).map(|_| rng.gen_bool(null_density as _)).collect());