You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/01 07:49:45 UTC
[arrow-datafusion] branch master updated: Cleanup hash_utils adding support for decimal256 and f16 (#4053)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c92a38fe5 Cleanup hash_utils adding support for decimal256 and f16 (#4053)
c92a38fe5 is described below
commit c92a38fe5b2a17b8f9a1cd29d181b99793bd26fc
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Nov 1 20:49:39 2022 +1300
Cleanup hash_utils adding support for decimal256 and f16 (#4053)
* Cleanup hash_utils
* Lint
* Clippy
---
datafusion-cli/Cargo.lock | 3 +
datafusion/core/Cargo.toml | 3 +
datafusion/core/src/physical_plan/hash_utils.rs | 548 ++++--------------------
3 files changed, 87 insertions(+), 467 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 7c8a7139f..db724780e 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -547,6 +547,8 @@ version = "13.0.0"
dependencies = [
"ahash 0.8.0",
"arrow",
+ "arrow-buffer",
+ "arrow-schema",
"async-compression",
"async-trait",
"bytes",
@@ -561,6 +563,7 @@ dependencies = [
"flate2",
"futures",
"glob",
+ "half",
"hashbrown",
"itertools",
"lazy_static",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f5fbf9974..2764273ce 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -57,6 +57,8 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "25.0.0", features = ["prettyprint"] }
+arrow-buffer = "25.0.0"
+arrow-schema = "25.0.0"
async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] }
async-trait = "0.1.41"
bytes = "1.1"
@@ -72,6 +74,7 @@ datafusion-sql = { path = "../sql", version = "13.0.0" }
flate2 = "1.0.24"
futures = "0.3"
glob = "0.3.0"
+half = { version = "2.1", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = "0.10"
lazy_static = { version = "^1.4.0" }
diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs
index 899008aa4..75f5bab46 100644
--- a/datafusion/core/src/physical_plan/hash_utils.rs
+++ b/datafusion/core/src/physical_plan/hash_utils.rs
@@ -19,17 +19,10 @@
use crate::error::{DataFusionError, Result};
use ahash::RandomState;
-use arrow::array::{
- Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
- DictionaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array,
- Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray,
- TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
- TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-};
-use arrow::datatypes::{
- ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type,
- Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
-};
+use arrow::array::*;
+use arrow::datatypes::*;
+use arrow::{downcast_dictionary_array, downcast_primitive_array};
+use arrow_buffer::i256;
use std::sync::Arc;
// Combines two hashes into one hash
@@ -52,199 +45,98 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col:
}
}
-fn hash_decimal128<'a>(
- array: &ArrayRef,
+trait HashValue {
+ fn hash_one(&self, state: &RandomState) -> u64;
+}
+
+impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
+ fn hash_one(&self, state: &RandomState) -> u64 {
+ T::hash_one(self, state)
+ }
+}
+
+macro_rules! hash_value {
+ ($($t:ty),+) => {
+ $(impl HashValue for $t {
+ fn hash_one(&self, state: &RandomState) -> u64 {
+ state.hash_one(self)
+ }
+ })+
+ };
+}
+hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64);
+hash_value!(bool, str, [u8]);
+
+macro_rules! hash_float_value {
+ ($($t:ty),+) => {
+ $(impl HashValue for $t {
+ fn hash_one(&self, state: &RandomState) -> u64 {
+ state.hash_one(self.to_le_bytes())
+ }
+ })+
+ };
+}
+hash_float_value!(half::f16, f32, f64);
+
+fn hash_array<T>(
+ array: T,
random_state: &RandomState,
- hashes_buffer: &'a mut [u64],
- mul_col: bool,
-) {
- let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ hashes_buffer: &mut [u64],
+ multi_col: bool,
+) where
+ T: ArrayAccessor,
+ T::Item: HashValue,
+{
if array.null_count() == 0 {
- if mul_col {
+ if multi_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
- *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash);
+ *hash = combine_hashes(array.value(i).hash_one(random_state), *hash);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
- *hash = random_state.hash_one(&array.value(i));
+ *hash = array.value(i).hash_one(random_state);
}
}
- } else if mul_col {
+ } else if multi_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
- *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash);
+ *hash = combine_hashes(array.value(i).hash_one(random_state), *hash);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
- *hash = random_state.hash_one(&array.value(i));
+ *hash = array.value(i).hash_one(random_state);
}
}
}
}
-macro_rules! hash_array {
- ($array_type:ident, $column: ident, $ty: ty, $hashes: ident, $random_state: ident, $multi_col: ident) => {
- let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
- if array.null_count() == 0 {
- if $multi_col {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash =
- combine_hashes($random_state.hash_one(&array.value(i)), *hash);
- }
- } else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash = $random_state.hash_one(&array.value(i));
- }
- }
- } else {
- if $multi_col {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- if !array.is_null(i) {
- *hash = combine_hashes(
- $random_state.hash_one(&array.value(i)),
- *hash,
- );
- }
- }
- } else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- if !array.is_null(i) {
- *hash = $random_state.hash_one(&array.value(i));
- }
- }
- }
- }
- };
-}
-
-macro_rules! hash_array_primitive {
- ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
- let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
- let values = array.values();
-
- if array.null_count() == 0 {
- if $multi_col {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
- *hash = combine_hashes($random_state.hash_one(value), *hash);
- }
- } else {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
- *hash = $random_state.hash_one(value)
- }
- }
- } else {
- if $multi_col {
- for (i, (hash, value)) in
- $hashes.iter_mut().zip(values.iter()).enumerate()
- {
- if !array.is_null(i) {
- *hash = combine_hashes($random_state.hash_one(value), *hash);
- }
- }
- } else {
- for (i, (hash, value)) in
- $hashes.iter_mut().zip(values.iter()).enumerate()
- {
- if !array.is_null(i) {
- *hash = $random_state.hash_one(value);
- }
- }
- }
- }
- };
-}
-
-macro_rules! hash_array_float {
- ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
- let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
- let values = array.values();
-
- if array.null_count() == 0 {
- if $multi_col {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
- *hash = combine_hashes(
- $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes())),
- *hash,
- );
- }
- } else {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
- *hash =
- $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes()))
- }
- }
- } else {
- if $multi_col {
- for (i, (hash, value)) in
- $hashes.iter_mut().zip(values.iter()).enumerate()
- {
- if !array.is_null(i) {
- *hash = combine_hashes(
- $random_state
- .hash_one(&$ty::from_le_bytes(value.to_le_bytes())),
- *hash,
- );
- }
- }
- } else {
- for (i, (hash, value)) in
- $hashes.iter_mut().zip(values.iter()).enumerate()
- {
- if !array.is_null(i) {
- *hash = $random_state
- .hash_one(&$ty::from_le_bytes(value.to_le_bytes()));
- }
- }
- }
- }
- };
-}
-
/// Hash the values in a dictionary array
-fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
- array: &ArrayRef,
+fn hash_dictionary<K: ArrowDictionaryKeyType>(
+ array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
- let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
-
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
- let dict_values = Arc::clone(dict_array.values());
- let mut dict_hashes = vec![0; dict_values.len()];
- create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
+ let values = Arc::clone(array.values());
+ let mut dict_hashes = vec![0; values.len()];
+ create_hashes(&[values], random_state, &mut dict_hashes)?;
// combine hash for each index in values
if multi_col {
- for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+ for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
- let idx = key
- .to_usize()
- .ok_or_else(|| {
- DataFusionError::Internal(format!(
- "Can not convert key value {:?} to usize in dictionary of type {:?}",
- key, dict_array.data_type()
- ))
- })?;
- *hash = combine_hashes(dict_hashes[idx], *hash)
+ *hash = combine_hashes(dict_hashes[key.as_usize()], *hash)
} // no update for Null, consistent with other hashes
}
} else {
- for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+ for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
- let idx = key
- .to_usize()
- .ok_or_else(|| {
- DataFusionError::Internal(format!(
- "Can not convert key value {:?} to usize in dictionary of type {:?}",
- key, dict_array.data_type()
- ))
- })?;
- *hash = dict_hashes[idx]
+ *hash = dict_hashes[key.as_usize()]
} // no update for Null, consistent with other hashes
}
}
@@ -312,309 +204,34 @@ pub fn create_hashes<'a>(
) -> Result<&'a mut Vec<u64>> {
// combine hashes with `combine_hashes` if we have more than 1 column
- use arrow::array::{BinaryArray, LargeBinaryArray};
let multi_col = arrays.len() > 1;
for col in arrays {
- match col.data_type() {
- DataType::Null => {
- hash_null(random_state, hashes_buffer, multi_col);
+ let array = col.as_ref();
+ downcast_primitive_array! {
+ array => hash_array(array, random_state, hashes_buffer, multi_col),
+ DataType::Null => hash_null(random_state, hashes_buffer, multi_col),
+ DataType::Boolean => hash_array(as_boolean_array(array), random_state, hashes_buffer, multi_col),
+ DataType::Utf8 => hash_array(as_string_array(array), random_state, hashes_buffer, multi_col),
+ DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, multi_col),
+ DataType::Binary => hash_array(as_generic_binary_array::<i32>(array), random_state, hashes_buffer, multi_col),
+ DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array), random_state, hashes_buffer, multi_col),
+ DataType::FixedSizeBinary(_) => {
+ let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
+ hash_array(array, random_state, hashes_buffer, multi_col)
}
DataType::Decimal128(_, _) => {
- hash_decimal128(col, random_state, hashes_buffer, multi_col);
- }
- DataType::UInt8 => {
- hash_array_primitive!(
- UInt8Array,
- col,
- u8,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::UInt16 => {
- hash_array_primitive!(
- UInt16Array,
- col,
- u16,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::UInt32 => {
- hash_array_primitive!(
- UInt32Array,
- col,
- u32,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::UInt64 => {
- hash_array_primitive!(
- UInt64Array,
- col,
- u64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Int8 => {
- hash_array_primitive!(
- Int8Array,
- col,
- i8,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Int16 => {
- hash_array_primitive!(
- Int16Array,
- col,
- i16,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Int32 => {
- hash_array_primitive!(
- Int32Array,
- col,
- i32,
- hashes_buffer,
- random_state,
- multi_col
- );
+ let array = as_primitive_array::<Decimal128Type>(array);
+ hash_array(array, random_state, hashes_buffer, multi_col)
}
- DataType::Int64 => {
- hash_array_primitive!(
- Int64Array,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
+ DataType::Decimal256(_, _) => {
+ let array = as_primitive_array::<Decimal256Type>(array);
+ hash_array(array, random_state, hashes_buffer, multi_col)
}
- DataType::Float32 => {
- hash_array_float!(
- Float32Array,
- col,
- u32,
- hashes_buffer,
- random_state,
- multi_col
- );
+ DataType::Dictionary(_, _) => downcast_dictionary_array! {
+ array => hash_dictionary(array, random_state, hashes_buffer, multi_col)?,
+ _ => unreachable!()
}
- DataType::Float64 => {
- hash_array_float!(
- Float64Array,
- col,
- u64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Timestamp(TimeUnit::Second, None) => {
- hash_array_primitive!(
- TimestampSecondArray,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Timestamp(TimeUnit::Millisecond, None) => {
- hash_array_primitive!(
- TimestampMillisecondArray,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
- hash_array_primitive!(
- TimestampMicrosecondArray,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- hash_array_primitive!(
- TimestampNanosecondArray,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Date32 => {
- hash_array_primitive!(
- Date32Array,
- col,
- i32,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Date64 => {
- hash_array_primitive!(
- Date64Array,
- col,
- i64,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Boolean => {
- hash_array!(
- BooleanArray,
- col,
- u8,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Utf8 => {
- hash_array!(
- StringArray,
- col,
- str,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::LargeUtf8 => {
- hash_array!(
- LargeStringArray,
- col,
- str,
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Binary => {
- hash_array!(
- BinaryArray,
- col,
- &[u8],
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::FixedSizeBinary(_) => {
- hash_array!(
- FixedSizeBinaryArray,
- col,
- &[u8],
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::LargeBinary => {
- hash_array!(
- LargeBinaryArray,
- col,
- &[u8],
- hashes_buffer,
- random_state,
- multi_col
- );
- }
- DataType::Dictionary(index_type, _) => match **index_type {
- DataType::Int8 => {
- create_hashes_dictionary::<Int8Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::Int16 => {
- create_hashes_dictionary::<Int16Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::Int32 => {
- create_hashes_dictionary::<Int32Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::Int64 => {
- create_hashes_dictionary::<Int64Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::UInt8 => {
- create_hashes_dictionary::<UInt8Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::UInt16 => {
- create_hashes_dictionary::<UInt16Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::UInt32 => {
- create_hashes_dictionary::<UInt32Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- DataType::UInt64 => {
- create_hashes_dictionary::<UInt64Type>(
- col,
- random_state,
- hashes_buffer,
- multi_col,
- )?;
- }
- _ => {
- return Err(DataFusionError::Internal(format!(
- "Unsupported dictionary type in hasher hashing: {}",
- col.data_type(),
- )))
- }
- },
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(format!(
@@ -630,10 +247,7 @@ pub fn create_hashes<'a>(
#[cfg(test)]
mod tests {
use crate::from_slice::FromSlice;
- use arrow::{
- array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray},
- datatypes::Int8Type,
- };
+ use arrow::{array::*, datatypes::*};
use std::sync::Arc;
use super::*;