You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/04 16:31:57 UTC

[arrow-datafusion] branch master updated: Implement vectorized hashing for DictionaryArray types (#812)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a58c4  Implement vectorized hashing for DictionaryArray types (#812)
a5a58c4 is described below

commit a5a58c4f23720eda63b02a6cad2902b715288db6
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Aug 4 12:31:48 2021 -0400

    Implement vectorized hashing for DictionaryArray types (#812)
    
    * Implement vectorized hashing for DictionaryArray types
    
    * improve comments
    
    * Check is_multicol outside of the loop
---
 datafusion/src/physical_plan/hash_utils.rs | 224 +++++++++++++++++++++++++++--
 1 file changed, 214 insertions(+), 10 deletions(-)

diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs
index e937b4e..abfa09a 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -20,13 +20,17 @@
 use crate::error::{DataFusionError, Result};
 use ahash::{CallHasher, RandomState};
 use arrow::array::{
-    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
-    Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray,
-    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
-    UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DictionaryArray,
+    Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
+    LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
+    TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Int16Type, Int32Type,
+    Int64Type, Int8Type, Schema, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
 };
-use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 use std::collections::HashSet;
+use std::sync::Arc;
 
 use crate::logical_plan::JoinType;
 use crate::physical_plan::expressions::Column;
@@ -245,9 +249,60 @@ macro_rules! hash_array_float {
     };
 }
 
-/// Creates hash values for every row, based on the values in the columns
+/// Hash the values in a dictionary array
+fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
+    array: &ArrayRef,
+    random_state: &RandomState,
+    hashes_buffer: &mut Vec<u64>,
+    multi_col: bool,
+) -> Result<()> {
+    let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
+
+    // Hash each dictionary value once, and then use that computed
+    // hash for each key value to avoid a potentially expensive
+    // redundant hashing for large dictionary elements (e.g. strings)
+    let dict_values = Arc::clone(dict_array.values());
+    let mut dict_hashes = vec![0; dict_values.len()];
+    create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
+
+    // combine hash for each index in values
+    if multi_col {
+        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+            if let Some(key) = key {
+                let idx = key
+                    .to_usize()
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(format!(
+                            "Can not convert key value {:?} to usize in dictionary of type {:?}",
+                            key, dict_array.data_type()
+                        ))
+                    })?;
+                *hash = combine_hashes(dict_hashes[idx], *hash)
+            } // no update for Null, consistent with other hashes
+        }
+    } else {
+        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
+            if let Some(key) = key {
+                let idx = key
+                    .to_usize()
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(format!(
+                            "Can not convert key value {:?} to usize in dictionary of type {:?}",
+                            key, dict_array.data_type()
+                        ))
+                    })?;
+                *hash = dict_hashes[idx]
+            } // no update for Null, consistent with other hashes
+        }
+    }
+    Ok(())
+}
+
+/// Creates hash values for every row, based on the values in the
+/// columns.
 ///
-/// This implements so-called "vectorized hashing"
+/// The number of rows to hash is determined by `hashes_buffer.len()`.
+/// `hashes_buffer` should be pre-sized appropriately
 pub fn create_hashes<'a>(
     arrays: &[ArrayRef],
     random_state: &RandomState,
@@ -438,11 +493,84 @@ pub fn create_hashes<'a>(
                     multi_col
                 );
             }
+            DataType::Dictionary(index_type, _) => match **index_type {
+                DataType::Int8 => {
+                    create_hashes_dictionary::<Int8Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::Int16 => {
+                    create_hashes_dictionary::<Int16Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::Int32 => {
+                    create_hashes_dictionary::<Int32Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::Int64 => {
+                    create_hashes_dictionary::<Int64Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::UInt8 => {
+                    create_hashes_dictionary::<UInt8Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::UInt16 => {
+                    create_hashes_dictionary::<UInt16Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::UInt32 => {
+                    create_hashes_dictionary::<UInt32Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                DataType::UInt64 => {
+                    create_hashes_dictionary::<UInt64Type>(
+                        col,
+                        random_state,
+                        hashes_buffer,
+                        multi_col,
+                    )?;
+                }
+                _ => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported dictionary type in hasher hashing: {}",
+                        col.data_type(),
+                    )))
+                }
+            },
             _ => {
                 // This is internal because we should have caught this before.
-                return Err(DataFusionError::Internal(
-                    "Unsupported data type in hasher".to_string(),
-                ));
+                return Err(DataFusionError::Internal(format!(
+                    "Unsupported data type in hasher: {}",
+                    col.data_type()
+                )));
             }
         }
     }
@@ -453,6 +581,8 @@ pub fn create_hashes<'a>(
 mod tests {
     use std::sync::Arc;
 
+    use arrow::{array::DictionaryArray, datatypes::Int8Type};
+
     use super::*;
 
     fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> Result<()> {
@@ -529,4 +659,78 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn create_hashes_for_dict_arrays() {
+        let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None];
+
+        let string_array = Arc::new(strings.iter().cloned().collect::<StringArray>());
+        let dict_array = Arc::new(
+            strings
+                .iter()
+                .cloned()
+                .collect::<DictionaryArray<Int8Type>>(),
+        );
+
+        let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+        let mut string_hashes = vec![0; strings.len()];
+        create_hashes(&[string_array], &random_state, &mut string_hashes).unwrap();
+
+        let mut dict_hashes = vec![0; strings.len()];
+        create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
+
+        // Null values result in a zero hash,
+        for (val, hash) in strings.iter().zip(string_hashes.iter()) {
+            match val {
+                Some(_) => assert_ne!(*hash, 0),
+                None => assert_eq!(*hash, 0),
+            }
+        }
+
+        // same logical values should hash to the same hash value
+        assert_eq!(string_hashes, dict_hashes);
+
+        // Same values should map to same hash values
+        assert_eq!(strings[1], strings[4]);
+        assert_eq!(dict_hashes[1], dict_hashes[4]);
+        assert_eq!(strings[0], strings[3]);
+        assert_eq!(dict_hashes[0], dict_hashes[3]);
+
+        // different strings should matp to different hash values
+        assert_ne!(strings[0], strings[2]);
+        assert_ne!(dict_hashes[0], dict_hashes[2]);
+    }
+
+    #[test]
+    fn create_multi_column_hash_for_dict_arrays() {
+        let strings1 = vec![Some("foo"), None, Some("bar")];
+        let strings2 = vec![Some("blarg"), Some("blah"), None];
+
+        let string_array = Arc::new(strings1.iter().cloned().collect::<StringArray>());
+        let dict_array = Arc::new(
+            strings2
+                .iter()
+                .cloned()
+                .collect::<DictionaryArray<Int32Type>>(),
+        );
+
+        let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+        let mut one_col_hashes = vec![0; strings1.len()];
+        create_hashes(&[dict_array.clone()], &random_state, &mut one_col_hashes).unwrap();
+
+        let mut two_col_hashes = vec![0; strings1.len()];
+        create_hashes(
+            &[dict_array, string_array],
+            &random_state,
+            &mut two_col_hashes,
+        )
+        .unwrap();
+
+        assert_eq!(one_col_hashes.len(), 3);
+        assert_eq!(two_col_hashes.len(), 3);
+
+        assert_ne!(one_col_hashes, two_col_hashes);
+    }
 }