You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/13 00:37:32 UTC
[arrow-datafusion] branch main updated: Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 065dba72e9 Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)
065dba72e9 is described below
commit 065dba72e911e4510afdc86af54fbd6164df4b38
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Jun 13 03:37:27 2023 +0300
Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)
* remove hash dependency from EquivalenceProperties
* Convert OrderedColumn to PhysicalSortExpr
* Convert unit tests to new API.
* Simplifications
* Simplifications
* Add new test
* Update comments, move type definition to common place.
* Update comment
* change function name
* Add hash support for PhysicalExpr and PhysicalSortExpr
* Better commenting
* Address reviews
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/physical-expr/src/equivalence.rs | 96 ++++------------------
datafusion/physical-expr/src/expressions/binary.rs | 8 +-
datafusion/physical-expr/src/expressions/case.rs | 8 +-
datafusion/physical-expr/src/expressions/cast.rs | 9 ++
datafusion/physical-expr/src/expressions/column.rs | 11 +++
.../physical-expr/src/expressions/datetime.rs | 8 +-
.../src/expressions/get_indexed_field.rs | 8 +-
.../physical-expr/src/expressions/in_list.rs | 9 ++
.../physical-expr/src/expressions/is_not_null.rs | 8 +-
.../physical-expr/src/expressions/is_null.rs | 8 +-
datafusion/physical-expr/src/expressions/like.rs | 8 +-
.../physical-expr/src/expressions/literal.rs | 8 +-
.../physical-expr/src/expressions/negative.rs | 8 +-
datafusion/physical-expr/src/expressions/no_op.rs | 8 +-
datafusion/physical-expr/src/expressions/not.rs | 8 +-
.../physical-expr/src/expressions/try_cast.rs | 8 +-
datafusion/physical-expr/src/physical_expr.rs | 39 +++++++++
datafusion/physical-expr/src/scalar_function.rs | 9 ++
datafusion/physical-expr/src/sort_expr.rs | 10 +++
19 files changed, 188 insertions(+), 91 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 78279851bb..52bde7475b 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -23,7 +23,8 @@ use crate::{
use arrow::datatypes::SchemaRef;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
use std::sync::Arc;
/// Represents a collection of [`EquivalentClass`] (equivalences
@@ -39,7 +40,7 @@ pub struct EquivalenceProperties<T = Column> {
schema: SchemaRef,
}
-impl<T: PartialEq + Clone> EquivalenceProperties<T> {
+impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
pub fn new(schema: SchemaRef) -> Self {
EquivalenceProperties {
classes: vec![],
@@ -113,33 +114,6 @@ impl<T: PartialEq + Clone> EquivalenceProperties<T> {
}
}
-/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries
-fn deduplicate_vector<T: PartialEq>(in_data: Vec<T>) -> Vec<T> {
- let mut result = vec![];
- for elem in in_data {
- if !result.contains(&elem) {
- result.push(elem);
- }
- }
- result
-}
-
-/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`.
-fn get_entry_position<T: PartialEq>(in_data: &[T], entry: &T) -> Option<usize> {
- in_data.iter().position(|item| item.eq(entry))
-}
-
-/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`)
-/// Otherwise return `false`
-fn remove_from_vec<T: PartialEq>(in_data: &mut Vec<T>, entry: &T) -> bool {
- if let Some(idx) = get_entry_position(in_data, entry) {
- in_data.remove(idx);
- true
- } else {
- false
- }
-}
-
// Helper function to calculate column info recursively
fn get_column_indices_helper(
indices: &mut Vec<(usize, String)>,
@@ -187,20 +161,22 @@ pub struct EquivalentClass<T = Column> {
/// First element in the EquivalentClass
head: T,
/// Other equal columns
- others: Vec<T>,
+ others: HashSet<T>,
}
-impl<T: PartialEq + Clone> EquivalentClass<T> {
+impl<T: Eq + Hash + Clone> EquivalentClass<T> {
pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
- let others = deduplicate_vector(others);
- EquivalentClass { head, others }
+ EquivalentClass {
+ head,
+ others: HashSet::from_iter(others),
+ }
}
pub fn head(&self) -> &T {
&self.head
}
- pub fn others(&self) -> &[T] {
+ pub fn others(&self) -> &HashSet<T> {
&self.others
}
@@ -209,24 +185,20 @@ impl<T: PartialEq + Clone> EquivalentClass<T> {
}
pub fn insert(&mut self, col: T) -> bool {
- if self.head != col && !self.others.contains(&col) {
- self.others.push(col);
- true
- } else {
- false
- }
+ self.head != col && self.others.insert(col)
}
pub fn remove(&mut self, col: &T) -> bool {
- let removed = remove_from_vec(&mut self.others, col);
- // If we are removing the head, shift others so that its first entry becomes the new head.
+ let removed = self.others.remove(col);
+ // If we are removing the head, adjust others so that its first entry becomes the new head.
if !removed && *col == self.head {
- let one_col = self.others.first().cloned();
- if let Some(col) = one_col {
- let removed = remove_from_vec(&mut self.others, &col);
+ if let Some(col) = self.others.iter().next().cloned() {
+ let removed = self.others.remove(&col);
self.head = col;
removed
} else {
+ // We don't allow empty equivalence classes, reject removal if one tries removing
+ // the only element in an equivalence class.
false
}
} else {
@@ -556,40 +528,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_deduplicate_vector() -> Result<()> {
- assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]);
- assert_eq!(
- deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]),
- vec![1, 2, 3, 4, 0]
- );
- Ok(())
- }
-
- #[test]
- fn test_get_entry_position() -> Result<()> {
- assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2));
- assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0));
- assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None);
- Ok(())
- }
-
- #[test]
- fn test_remove_from_vec() -> Result<()> {
- let mut in_data = vec![1, 1, 2, 3, 3];
- remove_from_vec(&mut in_data, &5);
- assert_eq!(in_data, vec![1, 1, 2, 3, 3]);
- remove_from_vec(&mut in_data, &2);
- assert_eq!(in_data, vec![1, 1, 3, 3]);
- remove_from_vec(&mut in_data, &2);
- assert_eq!(in_data, vec![1, 1, 3, 3]);
- remove_from_vec(&mut in_data, &3);
- assert_eq!(in_data, vec![1, 1, 3]);
- remove_from_vec(&mut in_data, &3);
- assert_eq!(in_data, vec![1, 1]);
- Ok(())
- }
-
#[test]
fn test_get_column_infos() -> Result<()> {
let expr1 = Arc::new(Column::new("col1", 2)) as _;
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 994159a8a7..a86b8a62b5 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -19,6 +19,7 @@ mod adapter;
mod kernels;
mod kernels_arrow;
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use arrow::array::*;
@@ -96,7 +97,7 @@ use datafusion_expr::type_coercion::binary::{
use datafusion_expr::{ColumnarValue, Operator};
/// Binary expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
@@ -837,6 +838,11 @@ impl PhysicalExpr for BinaryExpr {
};
Ok(vec![left, right])
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for BinaryExpr {
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index 903ccda62f..91fa9bbb93 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -16,6 +16,7 @@
// under the License.
use std::borrow::Cow;
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use crate::expressions::try_cast;
@@ -51,7 +52,7 @@ type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);
/// [WHEN ...]
/// [ELSE result]
/// END
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct CaseExpr {
/// Optional base expression that can be compared to literal values in the "when" expressions
expr: Option<Arc<dyn PhysicalExpr>>,
@@ -348,6 +349,11 @@ impl PhysicalExpr for CaseExpr {
)?))
}
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for CaseExpr {
diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs
index 8e4e1b57e8..b5916def86 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -17,6 +17,7 @@
use std::any::Any;
use std::fmt;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::intervals::Interval;
@@ -132,6 +133,14 @@ impl PhysicalExpr for CastExpr {
interval.cast_to(&cast_type, &self.cast_options)?,
)])
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.expr.hash(&mut s);
+ self.cast_type.hash(&mut s);
+ // Add `self.cast_options` when hash is available
+ // https://github.com/apache/arrow-rs/pull/4395
+ }
}
impl PartialEq<dyn Any> for CastExpr {
diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs
index eb2be5ef21..9eca9bf713 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -18,6 +18,7 @@
//! Column expression
use std::any::Any;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::{
@@ -109,6 +110,11 @@ impl PhysicalExpr for Column {
let col_bounds = context.column_boundaries[self.index].clone();
context.with_boundaries(col_bounds)
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for Column {
@@ -191,6 +197,11 @@ impl PhysicalExpr for UnKnownColumn {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for UnKnownColumn {
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index f1933c1d18..4d0ee5cc7d 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -27,12 +27,13 @@ use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::{ColumnarValue, Operator};
use std::any::Any;
use std::fmt::{Display, Formatter};
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar};
/// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct DateTimeIntervalExpr {
lhs: Arc<dyn PhysicalExpr>,
op: Operator,
@@ -185,6 +186,11 @@ impl PhysicalExpr for DateTimeIntervalExpr {
children[1].clone(),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for DateTimeIntervalExpr {
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index c07641796a..090cfe5a6e 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -35,10 +35,11 @@ use datafusion_expr::{
};
use std::convert::TryInto;
use std::fmt::Debug;
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
/// expression to get a field of a struct array.
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct GetIndexedFieldExpr {
arg: Arc<dyn PhysicalExpr>,
key: ScalarValue,
@@ -153,6 +154,11 @@ impl PhysicalExpr for GetIndexedFieldExpr {
self.key.clone(),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for GetIndexedFieldExpr {
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs
index 3feb728900..0bcddb4ec8 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -20,6 +20,7 @@
use ahash::RandomState;
use std::any::Any;
use std::fmt::Debug;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::hash_utils::HashValue;
@@ -330,6 +331,14 @@ impl PhysicalExpr for InListExpr {
self.static_filter.clone(),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.expr.hash(&mut s);
+ self.negated.hash(&mut s);
+ self.list.hash(&mut s);
+ // Add `self.static_filter` when hash is available
+ }
}
impl PartialEq<dyn Any> for InListExpr {
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs
index 32e53e0c1e..da717a517f 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -17,6 +17,7 @@
//! IS NOT NULL expression
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use crate::physical_expr::down_cast_any_ref;
@@ -31,7 +32,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
/// IS NOT NULL expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct IsNotNullExpr {
/// The input expression
arg: Arc<dyn PhysicalExpr>,
@@ -91,6 +92,11 @@ impl PhysicalExpr for IsNotNullExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(IsNotNullExpr::new(children[0].clone())))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for IsNotNullExpr {
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs
index 85e111440a..ee7897edd4 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -17,6 +17,7 @@
//! IS NULL expression
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use arrow::compute;
@@ -32,7 +33,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
/// IS NULL expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct IsNullExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
@@ -92,6 +93,11 @@ impl PhysicalExpr for IsNullExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(IsNullExpr::new(children[0].clone())))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for IsNullExpr {
diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs
index 456e477a1e..f549613acb 100644
--- a/datafusion/physical-expr/src/expressions/like.rs
+++ b/datafusion/physical-expr/src/expressions/like.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use arrow::{
@@ -35,7 +36,7 @@ use arrow::compute::kernels::comparison::{
};
// Like expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct LikeExpr {
negated: bool,
case_insensitive: bool,
@@ -186,6 +187,11 @@ impl PhysicalExpr for LikeExpr {
fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
context.with_boundaries(None)
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for LikeExpr {
diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs
index 013169ccf7..8cb2bd5b95 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -18,6 +18,7 @@
//! Literal expressions for physical operations
use std::any::Any;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::{
@@ -32,7 +33,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, Expr};
/// Represents a literal value
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Literal {
value: ScalarValue,
}
@@ -93,6 +94,11 @@ impl PhysicalExpr for Literal {
Some(1),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for Literal {
diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs
index 0d6aec879e..7f1bd43fec 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -18,6 +18,7 @@
//! Negation (-) expression
use std::any::Any;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::array::ArrayRef;
@@ -52,7 +53,7 @@ macro_rules! compute_op {
}
/// Negative expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct NegativeExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
@@ -128,6 +129,11 @@ impl PhysicalExpr for NegativeExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(NegativeExpr::new(children[0].clone())))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for NegativeExpr {
diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs
index 7c7d8cc897..584d1d6695 100644
--- a/datafusion/physical-expr/src/expressions/no_op.rs
+++ b/datafusion/physical-expr/src/expressions/no_op.rs
@@ -18,6 +18,7 @@
//! NoOp placeholder for physical operations
use std::any::Any;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::{
@@ -33,7 +34,7 @@ use datafusion_expr::ColumnarValue;
/// A place holder expression, can not be evaluated.
///
/// Used in some cases where an `Arc<dyn PhysicalExpr>` is needed, such as `children()`
-#[derive(Debug, PartialEq, Eq, Default)]
+#[derive(Debug, PartialEq, Eq, Default, Hash)]
pub struct NoOp {}
impl NoOp {
@@ -79,6 +80,11 @@ impl PhysicalExpr for NoOp {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for NoOp {
diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs
index bf935aa97e..51cb8235a0 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -19,6 +19,7 @@
use std::any::Any;
use std::fmt;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::physical_expr::down_cast_any_ref;
@@ -29,7 +30,7 @@ use datafusion_common::{cast::as_boolean_array, DataFusionError, Result, ScalarV
use datafusion_expr::ColumnarValue;
/// Not expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct NotExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
@@ -105,6 +106,11 @@ impl PhysicalExpr for NotExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(NotExpr::new(children[0].clone())))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for NotExpr {
diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs
index bbb29d6fb5..92ffaa1a88 100644
--- a/datafusion/physical-expr/src/expressions/try_cast.rs
+++ b/datafusion/physical-expr/src/expressions/try_cast.rs
@@ -17,6 +17,7 @@
use std::any::Any;
use std::fmt;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::physical_expr::down_cast_any_ref;
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
/// TRY_CAST expression casts an expression to a specific data type and retuns NULL on invalid cast
-#[derive(Debug)]
+#[derive(Debug, Hash)]
pub struct TryCastExpr {
/// The expression to cast
expr: Arc<dyn PhysicalExpr>,
@@ -105,6 +106,11 @@ impl PhysicalExpr for TryCastExpr {
self.cast_type.clone(),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.hash(&mut s);
+ }
}
impl PartialEq<dyn Any> for TryCastExpr {
diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index d6dd14e8a1..68525920a0 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -33,6 +33,7 @@ use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterato
use crate::intervals::Interval;
use std::any::Any;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
/// Expression that can be evaluated against a RecordBatch
@@ -104,6 +105,44 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
"Not implemented for {self}"
)))
}
+
+ /// Update the hash `state` with this expression requirements from
+ /// [`Hash`].
+ ///
+ /// This method is required to support hashing [`PhysicalExpr`]s. To
+ /// implement it, typically the type implementing
+ /// [`PhysicalExpr`] implements [`Hash`] and
+ /// then the following boiler plate is used:
+ ///
+ /// # Example:
+ /// ```
+ /// // User defined expression that derives Hash
+ /// #[derive(Hash, Debug, PartialEq, Eq)]
+ /// struct MyExpr {
+ /// val: u64
+ /// }
+ ///
+ /// // impl PhysicalExpr {
+ /// // ...
+ /// # impl MyExpr {
+ /// // Boiler plate to call the derived Hash impl
+ /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+ /// use std::hash::Hash;
+ /// let mut s = state;
+ /// self.hash(&mut s);
+ /// }
+ /// // }
+ /// # }
+ /// ```
+ /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
+ /// directly because it must remain object safe.
+ fn dyn_hash(&self, _state: &mut dyn Hasher);
+}
+
+impl Hash for dyn PhysicalExpr {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.dyn_hash(state);
+ }
}
/// Shared [`PhysicalExpr`].
diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs
index da47a55aa9..ef771a6784 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -41,6 +41,7 @@ use datafusion_expr::ScalarFunctionImplementation;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
/// Physical expression of a scalar function
@@ -162,6 +163,14 @@ impl PhysicalExpr for ScalarFunctionExpr {
self.return_type(),
)))
}
+
+ fn dyn_hash(&self, state: &mut dyn Hasher) {
+ let mut s = state;
+ self.name.hash(&mut s);
+ self.args.hash(&mut s);
+ self.return_type.hash(&mut s);
+ // Add `self.fun` when hash is available
+ }
}
impl PartialEq<dyn Any> for ScalarFunctionExpr {
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index df519551d8..659e8c85e8 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -22,6 +22,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
/// Represents Sort operation for a column in a RecordBatch
@@ -39,6 +40,15 @@ impl PartialEq for PhysicalSortExpr {
}
}
+impl Eq for PhysicalSortExpr {}
+
+impl Hash for PhysicalSortExpr {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.expr.hash(state);
+ self.options.hash(state);
+ }
+}
+
impl std::fmt::Display for PhysicalSortExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} {}", self.expr, to_str(&self.options))