You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/01 07:49:45 UTC

[arrow-datafusion] branch master updated: Cleanup hash_utils adding support for decimal256 and f16 (#4053)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c92a38fe5 Cleanup hash_utils adding support for decimal256 and f16 (#4053)
c92a38fe5 is described below

commit c92a38fe5b2a17b8f9a1cd29d181b99793bd26fc
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Nov 1 20:49:39 2022 +1300

    Cleanup hash_utils adding support for decimal256 and f16 (#4053)
    
    * Cleanup hash_utils
    
    * Lint
    
    * Clippy
---
 datafusion-cli/Cargo.lock                       |   3 +
 datafusion/core/Cargo.toml                      |   3 +
 datafusion/core/src/physical_plan/hash_utils.rs | 548 ++++--------------------
 3 files changed, 87 insertions(+), 467 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 7c8a7139f..db724780e 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -547,6 +547,8 @@ version = "13.0.0"
 dependencies = [
  "ahash 0.8.0",
  "arrow",
+ "arrow-buffer",
+ "arrow-schema",
  "async-compression",
  "async-trait",
  "bytes",
@@ -561,6 +563,7 @@ dependencies = [
  "flate2",
  "futures",
  "glob",
+ "half",
  "hashbrown",
  "itertools",
  "lazy_static",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f5fbf9974..2764273ce 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -57,6 +57,8 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
 ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
 apache-avro = { version = "0.14", optional = true }
 arrow = { version = "25.0.0", features = ["prettyprint"] }
+arrow-buffer = "25.0.0"
+arrow-schema = "25.0.0"
 async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] }
 async-trait = "0.1.41"
 bytes = "1.1"
@@ -72,6 +74,7 @@ datafusion-sql = { path = "../sql", version = "13.0.0" }
 flate2 = "1.0.24"
 futures = "0.3"
 glob = "0.3.0"
+half = { version = "2.1", default-features = false }
 hashbrown = { version = "0.12", features = ["raw"] }
 itertools = "0.10"
 lazy_static = { version = "^1.4.0" }
diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs
index 899008aa4..75f5bab46 100644
--- a/datafusion/core/src/physical_plan/hash_utils.rs
+++ b/datafusion/core/src/physical_plan/hash_utils.rs
@@ -19,17 +19,10 @@
 
 use crate::error::{DataFusionError, Result};
 use ahash::RandomState;
-use arrow::array::{
-    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
-    DictionaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array,
-    Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray,
-    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
-    TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-};
-use arrow::datatypes::{
-    ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type,
-    Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
-};
+use arrow::array::*;
+use arrow::datatypes::*;
+use arrow::{downcast_dictionary_array, downcast_primitive_array};
+use arrow_buffer::i256;
 use std::sync::Arc;
 
 // Combines two hashes into one hash
@@ -52,199 +45,98 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col:
     }
 }
 
-fn hash_decimal128<'a>(
-    array: &ArrayRef,
+trait HashValue {
+    fn hash_one(&self, state: &RandomState) -> u64;
+}
+
+impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
+    fn hash_one(&self, state: &RandomState) -> u64 {
+        T::hash_one(self, state)
+    }
+}
+
+macro_rules! hash_value {
+    ($($t:ty),+) => {
+        $(impl HashValue for $t {
+            fn hash_one(&self, state: &RandomState) -> u64 {
+                state.hash_one(self)
+            }
+        })+
+    };
+}
+hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64);
+hash_value!(bool, str, [u8]);
+
+macro_rules! hash_float_value {
+    ($($t:ty),+) => {
+        $(impl HashValue for $t {
+            fn hash_one(&self, state: &RandomState) -> u64 {
+                state.hash_one(self.to_le_bytes())
+            }
+        })+
+    };
+}
+hash_float_value!(half::f16, f32, f64);
+
+fn hash_array<T>(
+    array: T,
     random_state: &RandomState,
-    hashes_buffer: &'a mut [u64],
-    mul_col: bool,
-) {
-    let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+    hashes_buffer: &mut [u64],
+    multi_col: bool,
+) where
+    T: ArrayAccessor,
+    T::Item: HashValue,
+{
     if array.null_count() == 0 {
-        if mul_col {
+        if multi_col {
             for (i, hash) in hashes_buffer.iter_mut().enumerate() {
-                *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash);
+                *hash = combine_hashes(array.value(i).hash_one(random_state), *hash);
             }
         } else {
             for (i, hash) in hashes_buffer.iter_mut().enumerate() {
-                *hash = random_state.hash_one(&array.value(i));
+                *hash = array.value(i).hash_one(random_state);
             }
         }
-    } else if mul_col {
+    } else if multi_col {
         for (i, hash) in hashes_buffer.iter_mut().enumerate() {
             if !array.is_null(i) {
-                *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash);
+                *hash = combine_hashes(array.value(i).hash_one(random_state), *hash);
             }
         }
     } else {
         for (i, hash) in hashes_buffer.iter_mut().enumerate() {
             if !array.is_null(i) {
-                *hash = random_state.hash_one(&array.value(i));
+                *hash = array.value(i).hash_one(random_state);
             }
         }
     }
 }
 
-macro_rules! hash_array {
-    ($array_type:ident, $column: ident, $ty: ty, $hashes: ident, $random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    *hash =
-                        combine_hashes($random_state.hash_one(&array.value(i)), *hash);
-                }
-            } else {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    *hash = $random_state.hash_one(&array.value(i));
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    if !array.is_null(i) {
-                        *hash = combine_hashes(
-                            $random_state.hash_one(&array.value(i)),
-                            *hash,
-                        );
-                    }
-                }
-            } else {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    if !array.is_null(i) {
-                        *hash = $random_state.hash_one(&array.value(i));
-                    }
-                }
-            }
-        }
-    };
-}
-
-macro_rules! hash_array_primitive {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        let values = array.values();
-
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = combine_hashes($random_state.hash_one(value), *hash);
-                }
-            } else {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = $random_state.hash_one(value)
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = combine_hashes($random_state.hash_one(value), *hash);
-                    }
-                }
-            } else {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = $random_state.hash_one(value);
-                    }
-                }
-            }
-        }
-    };
-}
-
-macro_rules! hash_array_float {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        let values = array.values();
-
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = combine_hashes(
-                        $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes())),
-                        *hash,
-                    );
-                }
-            } else {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash =
-                        $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes()))
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = combine_hashes(
-                            $random_state
-                                .hash_one(&$ty::from_le_bytes(value.to_le_bytes())),
-                            *hash,
-                        );
-                    }
-                }
-            } else {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = $random_state
-                            .hash_one(&$ty::from_le_bytes(value.to_le_bytes()));
-                    }
-                }
-            }
-        }
-    };
-}
-
 /// Hash the values in a dictionary array
-fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
-    array: &ArrayRef,
+fn hash_dictionary<K: ArrowDictionaryKeyType>(
+    array: &DictionaryArray<K>,
     random_state: &RandomState,
     hashes_buffer: &mut [u64],
     multi_col: bool,
 ) -> Result<()> {
-    let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
-
     // Hash each dictionary value once, and then use that computed
     // hash for each key value to avoid a potentially expensive
     // redundant hashing for large dictionary elements (e.g. strings)
-    let dict_values = Arc::clone(dict_array.values());
-    let mut dict_hashes = vec![0; dict_values.len()];
-    create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
+    let values = Arc::clone(array.values());
+    let mut dict_hashes = vec![0; values.len()];
+    create_hashes(&[values], random_state, &mut dict_hashes)?;
 
     // combine hash for each index in values
     if multi_col {
-        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+        for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
             if let Some(key) = key {
-                let idx = key
-                    .to_usize()
-                    .ok_or_else(|| {
-                        DataFusionError::Internal(format!(
-                            "Can not convert key value {:?} to usize in dictionary of type {:?}",
-                            key, dict_array.data_type()
-                        ))
-                    })?;
-                *hash = combine_hashes(dict_hashes[idx], *hash)
+                *hash = combine_hashes(dict_hashes[key.as_usize()], *hash)
             } // no update for Null, consistent with other hashes
         }
     } else {
-        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+        for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
             if let Some(key) = key {
-                let idx = key
-                    .to_usize()
-                    .ok_or_else(|| {
-                        DataFusionError::Internal(format!(
-                            "Can not convert key value {:?} to usize in dictionary of type {:?}",
-                            key, dict_array.data_type()
-                        ))
-                    })?;
-                *hash = dict_hashes[idx]
+                *hash = dict_hashes[key.as_usize()]
             } // no update for Null, consistent with other hashes
         }
     }
@@ -312,309 +204,34 @@ pub fn create_hashes<'a>(
 ) -> Result<&'a mut Vec<u64>> {
     // combine hashes with `combine_hashes` if we have more than 1 column
 
-    use arrow::array::{BinaryArray, LargeBinaryArray};
     let multi_col = arrays.len() > 1;
 
     for col in arrays {
-        match col.data_type() {
-            DataType::Null => {
-                hash_null(random_state, hashes_buffer, multi_col);
+        let array = col.as_ref();
+        downcast_primitive_array! {
+            array => hash_array(array, random_state, hashes_buffer, multi_col),
+            DataType::Null => hash_null(random_state, hashes_buffer, multi_col),
+            DataType::Boolean => hash_array(as_boolean_array(array), random_state, hashes_buffer, multi_col),
+            DataType::Utf8 => hash_array(as_string_array(array), random_state, hashes_buffer, multi_col),
+            DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, multi_col),
+            DataType::Binary => hash_array(as_generic_binary_array::<i32>(array), random_state, hashes_buffer, multi_col),
+            DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array), random_state, hashes_buffer, multi_col),
+            DataType::FixedSizeBinary(_) => {
+                let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
+                hash_array(array, random_state, hashes_buffer, multi_col)
             }
             DataType::Decimal128(_, _) => {
-                hash_decimal128(col, random_state, hashes_buffer, multi_col);
-            }
-            DataType::UInt8 => {
-                hash_array_primitive!(
-                    UInt8Array,
-                    col,
-                    u8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt16 => {
-                hash_array_primitive!(
-                    UInt16Array,
-                    col,
-                    u16,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt32 => {
-                hash_array_primitive!(
-                    UInt32Array,
-                    col,
-                    u32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt64 => {
-                hash_array_primitive!(
-                    UInt64Array,
-                    col,
-                    u64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int8 => {
-                hash_array_primitive!(
-                    Int8Array,
-                    col,
-                    i8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int16 => {
-                hash_array_primitive!(
-                    Int16Array,
-                    col,
-                    i16,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int32 => {
-                hash_array_primitive!(
-                    Int32Array,
-                    col,
-                    i32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
+                let array = as_primitive_array::<Decimal128Type>(array);
+                hash_array(array, random_state, hashes_buffer, multi_col)
             }
-            DataType::Int64 => {
-                hash_array_primitive!(
-                    Int64Array,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
+            DataType::Decimal256(_, _) => {
+                let array = as_primitive_array::<Decimal256Type>(array);
+                hash_array(array, random_state, hashes_buffer, multi_col)
             }
-            DataType::Float32 => {
-                hash_array_float!(
-                    Float32Array,
-                    col,
-                    u32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
+            DataType::Dictionary(_, _) => downcast_dictionary_array! {
+                array => hash_dictionary(array, random_state, hashes_buffer, multi_col)?,
+                _ => unreachable!()
             }
-            DataType::Float64 => {
-                hash_array_float!(
-                    Float64Array,
-                    col,
-                    u64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Second, None) => {
-                hash_array_primitive!(
-                    TimestampSecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, None) => {
-                hash_array_primitive!(
-                    TimestampMillisecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
-                hash_array_primitive!(
-                    TimestampMicrosecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                hash_array_primitive!(
-                    TimestampNanosecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Date32 => {
-                hash_array_primitive!(
-                    Date32Array,
-                    col,
-                    i32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Date64 => {
-                hash_array_primitive!(
-                    Date64Array,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Boolean => {
-                hash_array!(
-                    BooleanArray,
-                    col,
-                    u8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Utf8 => {
-                hash_array!(
-                    StringArray,
-                    col,
-                    str,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::LargeUtf8 => {
-                hash_array!(
-                    LargeStringArray,
-                    col,
-                    str,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Binary => {
-                hash_array!(
-                    BinaryArray,
-                    col,
-                    &[u8],
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::FixedSizeBinary(_) => {
-                hash_array!(
-                    FixedSizeBinaryArray,
-                    col,
-                    &[u8],
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::LargeBinary => {
-                hash_array!(
-                    LargeBinaryArray,
-                    col,
-                    &[u8],
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Dictionary(index_type, _) => match **index_type {
-                DataType::Int8 => {
-                    create_hashes_dictionary::<Int8Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::Int16 => {
-                    create_hashes_dictionary::<Int16Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::Int32 => {
-                    create_hashes_dictionary::<Int32Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::Int64 => {
-                    create_hashes_dictionary::<Int64Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::UInt8 => {
-                    create_hashes_dictionary::<UInt8Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::UInt16 => {
-                    create_hashes_dictionary::<UInt16Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::UInt32 => {
-                    create_hashes_dictionary::<UInt32Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                DataType::UInt64 => {
-                    create_hashes_dictionary::<UInt64Type>(
-                        col,
-                        random_state,
-                        hashes_buffer,
-                        multi_col,
-                    )?;
-                }
-                _ => {
-                    return Err(DataFusionError::Internal(format!(
-                        "Unsupported dictionary type in hasher hashing: {}",
-                        col.data_type(),
-                    )))
-                }
-            },
             _ => {
                 // This is internal because we should have caught this before.
                 return Err(DataFusionError::Internal(format!(
@@ -630,10 +247,7 @@ pub fn create_hashes<'a>(
 #[cfg(test)]
 mod tests {
     use crate::from_slice::FromSlice;
-    use arrow::{
-        array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray},
-        datatypes::Int8Type,
-    };
+    use arrow::{array::*, datatypes::*};
     use std::sync::Arc;
 
     use super::*;