You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/15 05:06:45 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9116: ARROW-11156: [Rust][DataFusion] Create hashes vectorized in hash join

jorgecarleitao commented on a change in pull request #9116:
URL: https://github.com/apache/arrow/pull/9116#discussion_r557861327



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -604,6 +562,199 @@ fn build_join_indexes(
         }
     }
 }
+use core::hash::BuildHasher;
+
+/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
+/// it when inserting/indexing or regrowing the `HashMap`
+struct IdHasher {
+    hash: u64,
+}
+
+impl Hasher for IdHasher {
+    fn finish(&self) -> u64 {
+        self.hash
+    }
+
+    fn write_u64(&mut self, i: u64) {
+        self.hash = i;
+    }
+
+    fn write(&mut self, _bytes: &[u8]) {
+        unreachable!("IdHasher should only be used for u64 keys")
+    }
+}
+
+#[derive(Debug)]
+struct IdHashBuilder {}
+
+impl BuildHasher for IdHashBuilder {
+    type Hasher = IdHasher;
+
+    fn build_hasher(&self) -> Self::Hasher {
+        IdHasher { hash: 0 }
+    }
+}
+
+// Combines two hashes into one hash
+fn combine_hashes(l: u64, r: u64) -> u64 {
+    let hash = (17 * 37u64).overflowing_add(l).0;
+    hash.overflowing_mul(37).0.overflowing_add(r).0
+}
+
+macro_rules! equal_rows_elem {
+    ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {
+        $l.as_any()
+            .downcast_ref::<$array_type>()
+            .unwrap()
+            .value($left)
+            == $r
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .unwrap()
+                .value($right)
+    };
+}
+
+/// Left and right row have equal values
+fn equal_rows(
+    left: usize,
+    right: usize,
+    left_arrays: &[ArrayRef],
+    right_arrays: &[ArrayRef],
+) -> Result<bool> {
+    let mut err = None;
+    let res = left_arrays
+        .iter()
+        .zip(right_arrays)
+        .all(|(l, r)| match l.data_type() {
+            DataType::Null => true,
+            DataType::Boolean => {
+                equal_rows_elem!(BooleanArray, l, r, left, right)
+            }
+            DataType::Int8 => {
+                equal_rows_elem!(Int8Array, l, r, left, right)
+            }
+            DataType::Int16 => {
+                equal_rows_elem!(Int16Array, l, r, left, right)
+            }
+            DataType::Int32 => {
+                equal_rows_elem!(Int32Array, l, r, left, right)
+            }
+            DataType::Int64 => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::UInt8 => {
+                equal_rows_elem!(UInt8Array, l, r, left, right)
+            }
+            DataType::UInt16 => {
+                equal_rows_elem!(UInt16Array, l, r, left, right)
+            }
+            DataType::UInt32 => {
+                equal_rows_elem!(UInt32Array, l, r, left, right)
+            }
+            DataType::UInt64 => {
+                equal_rows_elem!(UInt64Array, l, r, left, right)
+            }
+            DataType::Timestamp(_, None) => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::Utf8 => {
+                equal_rows_elem!(StringArray, l, r, left, right)
+            }
+            DataType::LargeUtf8 => {
+                equal_rows_elem!(LargeStringArray, l, r, left, right)
+            }
+            _ => {
+                // This is internal because we should have caught this before.
+                err = Some(Err(DataFusionError::Internal(
+                    "Unsupported data type in hasher".to_string(),
+                )));
+                false
+            }
+        });
+
+    err.unwrap_or(Ok(res))
+}
+
+macro_rules! hash_array {
+    ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+
+        for (i, hash) in $hashes.iter_mut().enumerate() {
+            let mut hasher = $random_state.build_hasher();
+            hasher.$f(array.value(i));

Review comment:
       A potential simplification here is to use `let values = array.values();` and zip them with `$hashes`.

##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -604,6 +562,199 @@ fn build_join_indexes(
         }
     }
 }
+use core::hash::BuildHasher;
+
+/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
+/// it when inserting/indexing or regrowing the `HashMap`
+struct IdHasher {
+    hash: u64,
+}
+
+impl Hasher for IdHasher {
+    fn finish(&self) -> u64 {
+        self.hash
+    }
+
+    fn write_u64(&mut self, i: u64) {
+        self.hash = i;
+    }
+
+    fn write(&mut self, _bytes: &[u8]) {
+        unreachable!("IdHasher should only be used for u64 keys")
+    }
+}
+
+#[derive(Debug)]
+struct IdHashBuilder {}
+
+impl BuildHasher for IdHashBuilder {
+    type Hasher = IdHasher;
+
+    fn build_hasher(&self) -> Self::Hasher {
+        IdHasher { hash: 0 }
+    }
+}
+
+// Combines two hashes into one hash
+fn combine_hashes(l: u64, r: u64) -> u64 {
+    let hash = (17 * 37u64).overflowing_add(l).0;
+    hash.overflowing_mul(37).0.overflowing_add(r).0
+}
+
+macro_rules! equal_rows_elem {
+    ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {
+        $l.as_any()
+            .downcast_ref::<$array_type>()
+            .unwrap()
+            .value($left)
+            == $r
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .unwrap()
+                .value($right)
+    };
+}
+
+/// Left and right row have equal values
+fn equal_rows(
+    left: usize,
+    right: usize,
+    left_arrays: &[ArrayRef],
+    right_arrays: &[ArrayRef],
+) -> Result<bool> {
+    let mut err = None;
+    let res = left_arrays
+        .iter()
+        .zip(right_arrays)
+        .all(|(l, r)| match l.data_type() {
+            DataType::Null => true,
+            DataType::Boolean => {
+                equal_rows_elem!(BooleanArray, l, r, left, right)
+            }
+            DataType::Int8 => {
+                equal_rows_elem!(Int8Array, l, r, left, right)
+            }
+            DataType::Int16 => {
+                equal_rows_elem!(Int16Array, l, r, left, right)
+            }
+            DataType::Int32 => {
+                equal_rows_elem!(Int32Array, l, r, left, right)
+            }
+            DataType::Int64 => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::UInt8 => {
+                equal_rows_elem!(UInt8Array, l, r, left, right)
+            }
+            DataType::UInt16 => {
+                equal_rows_elem!(UInt16Array, l, r, left, right)
+            }
+            DataType::UInt32 => {
+                equal_rows_elem!(UInt32Array, l, r, left, right)
+            }
+            DataType::UInt64 => {
+                equal_rows_elem!(UInt64Array, l, r, left, right)
+            }
+            DataType::Timestamp(_, None) => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::Utf8 => {
+                equal_rows_elem!(StringArray, l, r, left, right)
+            }
+            DataType::LargeUtf8 => {
+                equal_rows_elem!(LargeStringArray, l, r, left, right)
+            }
+            _ => {
+                // This is internal because we should have caught this before.
+                err = Some(Err(DataFusionError::Internal(
+                    "Unsupported data type in hasher".to_string(),
+                )));
+                false
+            }
+        });
+
+    err.unwrap_or(Ok(res))
+}
+
+macro_rules! hash_array {
+    ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => {

Review comment:
       IMO we should prob move this to the arrow crate at some point: hashing and equality are operations that need to be implemented with some invariants for consistency. I see that you had to implement an `equal_rows_elem`, which already indicates that.

##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -604,6 +562,199 @@ fn build_join_indexes(
         }
     }
 }
+use core::hash::BuildHasher;
+
+/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
+/// it when inserting/indexing or regrowing the `HashMap`
+struct IdHasher {
+    hash: u64,
+}
+
+impl Hasher for IdHasher {
+    fn finish(&self) -> u64 {
+        self.hash
+    }
+
+    fn write_u64(&mut self, i: u64) {
+        self.hash = i;
+    }
+
+    fn write(&mut self, _bytes: &[u8]) {
+        unreachable!("IdHasher should only be used for u64 keys")
+    }
+}
+
+#[derive(Debug)]
+struct IdHashBuilder {}
+
+impl BuildHasher for IdHashBuilder {
+    type Hasher = IdHasher;
+
+    fn build_hasher(&self) -> Self::Hasher {
+        IdHasher { hash: 0 }
+    }
+}
+
+// Combines two hashes into one hash
+fn combine_hashes(l: u64, r: u64) -> u64 {
+    let hash = (17 * 37u64).overflowing_add(l).0;
+    hash.overflowing_mul(37).0.overflowing_add(r).0
+}
+
+macro_rules! equal_rows_elem {
+    ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {
+        $l.as_any()
+            .downcast_ref::<$array_type>()
+            .unwrap()
+            .value($left)
+            == $r
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .unwrap()
+                .value($right)
+    };
+}
+
+/// Left and right row have equal values
+fn equal_rows(
+    left: usize,
+    right: usize,
+    left_arrays: &[ArrayRef],
+    right_arrays: &[ArrayRef],
+) -> Result<bool> {
+    let mut err = None;
+    let res = left_arrays
+        .iter()
+        .zip(right_arrays)
+        .all(|(l, r)| match l.data_type() {
+            DataType::Null => true,
+            DataType::Boolean => {
+                equal_rows_elem!(BooleanArray, l, r, left, right)
+            }
+            DataType::Int8 => {
+                equal_rows_elem!(Int8Array, l, r, left, right)
+            }
+            DataType::Int16 => {
+                equal_rows_elem!(Int16Array, l, r, left, right)
+            }
+            DataType::Int32 => {
+                equal_rows_elem!(Int32Array, l, r, left, right)
+            }
+            DataType::Int64 => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::UInt8 => {
+                equal_rows_elem!(UInt8Array, l, r, left, right)
+            }
+            DataType::UInt16 => {
+                equal_rows_elem!(UInt16Array, l, r, left, right)
+            }
+            DataType::UInt32 => {
+                equal_rows_elem!(UInt32Array, l, r, left, right)
+            }
+            DataType::UInt64 => {
+                equal_rows_elem!(UInt64Array, l, r, left, right)
+            }
+            DataType::Timestamp(_, None) => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::Utf8 => {
+                equal_rows_elem!(StringArray, l, r, left, right)
+            }
+            DataType::LargeUtf8 => {
+                equal_rows_elem!(LargeStringArray, l, r, left, right)
+            }
+            _ => {
+                // This is internal because we should have caught this before.
+                err = Some(Err(DataFusionError::Internal(
+                    "Unsupported data type in hasher".to_string(),
+                )));
+                false
+            }
+        });
+
+    err.unwrap_or(Ok(res))
+}
+
+macro_rules! hash_array {
+    ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+
+        for (i, hash) in $hashes.iter_mut().enumerate() {
+            let mut hasher = $random_state.build_hasher();
+            hasher.$f(array.value(i));
+            *hash = combine_hashes(hasher.finish(), *hash);
+        }
+    };
+}
+
+/// Creates hash values for every element in the row based on the values in the columns
+fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result<Vec<u64>> {

Review comment:
       This could be an iterator, which would allow to be plugged in in the iteration where it is used: avoids allocating a vector for later iterating over.

##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -604,6 +562,199 @@ fn build_join_indexes(
         }
     }
 }
+use core::hash::BuildHasher;
+
+/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
+/// it when inserting/indexing or regrowing the `HashMap`
+struct IdHasher {
+    hash: u64,
+}
+
+impl Hasher for IdHasher {
+    fn finish(&self) -> u64 {
+        self.hash
+    }
+
+    fn write_u64(&mut self, i: u64) {
+        self.hash = i;
+    }
+
+    fn write(&mut self, _bytes: &[u8]) {
+        unreachable!("IdHasher should only be used for u64 keys")
+    }
+}
+
+#[derive(Debug)]
+struct IdHashBuilder {}
+
+impl BuildHasher for IdHashBuilder {
+    type Hasher = IdHasher;
+
+    fn build_hasher(&self) -> Self::Hasher {
+        IdHasher { hash: 0 }
+    }
+}
+
+// Combines two hashes into one hash
+fn combine_hashes(l: u64, r: u64) -> u64 {
+    let hash = (17 * 37u64).overflowing_add(l).0;
+    hash.overflowing_mul(37).0.overflowing_add(r).0
+}
+
+macro_rules! equal_rows_elem {
+    ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {
+        $l.as_any()
+            .downcast_ref::<$array_type>()
+            .unwrap()
+            .value($left)
+            == $r
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .unwrap()
+                .value($right)
+    };
+}
+
+/// Left and right row have equal values
+fn equal_rows(
+    left: usize,
+    right: usize,
+    left_arrays: &[ArrayRef],
+    right_arrays: &[ArrayRef],
+) -> Result<bool> {
+    let mut err = None;
+    let res = left_arrays
+        .iter()
+        .zip(right_arrays)
+        .all(|(l, r)| match l.data_type() {
+            DataType::Null => true,
+            DataType::Boolean => {
+                equal_rows_elem!(BooleanArray, l, r, left, right)
+            }
+            DataType::Int8 => {
+                equal_rows_elem!(Int8Array, l, r, left, right)
+            }
+            DataType::Int16 => {
+                equal_rows_elem!(Int16Array, l, r, left, right)
+            }
+            DataType::Int32 => {
+                equal_rows_elem!(Int32Array, l, r, left, right)
+            }
+            DataType::Int64 => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::UInt8 => {
+                equal_rows_elem!(UInt8Array, l, r, left, right)
+            }
+            DataType::UInt16 => {
+                equal_rows_elem!(UInt16Array, l, r, left, right)
+            }
+            DataType::UInt32 => {
+                equal_rows_elem!(UInt32Array, l, r, left, right)
+            }
+            DataType::UInt64 => {
+                equal_rows_elem!(UInt64Array, l, r, left, right)
+            }
+            DataType::Timestamp(_, None) => {
+                equal_rows_elem!(Int64Array, l, r, left, right)
+            }
+            DataType::Utf8 => {
+                equal_rows_elem!(StringArray, l, r, left, right)
+            }
+            DataType::LargeUtf8 => {
+                equal_rows_elem!(LargeStringArray, l, r, left, right)
+            }
+            _ => {
+                // This is internal because we should have caught this before.
+                err = Some(Err(DataFusionError::Internal(
+                    "Unsupported data type in hasher".to_string(),
+                )));
+                false
+            }
+        });
+
+    err.unwrap_or(Ok(res))
+}
+
+macro_rules! hash_array {
+    ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+
+        for (i, hash) in $hashes.iter_mut().enumerate() {
+            let mut hasher = $random_state.build_hasher();
+            hasher.$f(array.value(i));

Review comment:
       Isn't this ignoring nulls? Maybe because they are discarded in joins? If so, isn't this a problem, as we are reading the value of a null slot?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org