You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/13 00:37:32 UTC

[arrow-datafusion] branch main updated: Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 065dba72e9 Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)
065dba72e9 is described below

commit 065dba72e911e4510afdc86af54fbd6164df4b38
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Jun 13 03:37:27 2023 +0300

    Add hash support for PhysicalExpr and PhysicalSortExpr (#6625)
    
    * remove hash dependency from EquivalenceProperties
    
    * Convert OrderedColumn to PhysicalSortExpr
    
    * Convert unit tests to new API.
    
    * Simplifications
    
    * Simplifications
    
    * Add new test
    
    * Update comments, move type definition to common place.
    
    * Update comment
    
    * change function name
    
    * Add hash support for PhysicalExpr and PhysicalSortExpr
    
    * Better commenting
    
    * Address reviews
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/physical-expr/src/equivalence.rs        | 96 ++++------------------
 datafusion/physical-expr/src/expressions/binary.rs |  8 +-
 datafusion/physical-expr/src/expressions/case.rs   |  8 +-
 datafusion/physical-expr/src/expressions/cast.rs   |  9 ++
 datafusion/physical-expr/src/expressions/column.rs | 11 +++
 .../physical-expr/src/expressions/datetime.rs      |  8 +-
 .../src/expressions/get_indexed_field.rs           |  8 +-
 .../physical-expr/src/expressions/in_list.rs       |  9 ++
 .../physical-expr/src/expressions/is_not_null.rs   |  8 +-
 .../physical-expr/src/expressions/is_null.rs       |  8 +-
 datafusion/physical-expr/src/expressions/like.rs   |  8 +-
 .../physical-expr/src/expressions/literal.rs       |  8 +-
 .../physical-expr/src/expressions/negative.rs      |  8 +-
 datafusion/physical-expr/src/expressions/no_op.rs  |  8 +-
 datafusion/physical-expr/src/expressions/not.rs    |  8 +-
 .../physical-expr/src/expressions/try_cast.rs      |  8 +-
 datafusion/physical-expr/src/physical_expr.rs      | 39 +++++++++
 datafusion/physical-expr/src/scalar_function.rs    |  9 ++
 datafusion/physical-expr/src/sort_expr.rs          | 10 +++
 19 files changed, 188 insertions(+), 91 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 78279851bb..52bde7475b 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -23,7 +23,8 @@ use crate::{
 
 use arrow::datatypes::SchemaRef;
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
 use std::sync::Arc;
 
 /// Represents a collection of [`EquivalentClass`] (equivalences
@@ -39,7 +40,7 @@ pub struct EquivalenceProperties<T = Column> {
     schema: SchemaRef,
 }
 
-impl<T: PartialEq + Clone> EquivalenceProperties<T> {
+impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
     pub fn new(schema: SchemaRef) -> Self {
         EquivalenceProperties {
             classes: vec![],
@@ -113,33 +114,6 @@ impl<T: PartialEq + Clone> EquivalenceProperties<T> {
     }
 }
 
-/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries
-fn deduplicate_vector<T: PartialEq>(in_data: Vec<T>) -> Vec<T> {
-    let mut result = vec![];
-    for elem in in_data {
-        if !result.contains(&elem) {
-            result.push(elem);
-        }
-    }
-    result
-}
-
-/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`.
-fn get_entry_position<T: PartialEq>(in_data: &[T], entry: &T) -> Option<usize> {
-    in_data.iter().position(|item| item.eq(entry))
-}
-
-/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`)
-/// Otherwise return `false`
-fn remove_from_vec<T: PartialEq>(in_data: &mut Vec<T>, entry: &T) -> bool {
-    if let Some(idx) = get_entry_position(in_data, entry) {
-        in_data.remove(idx);
-        true
-    } else {
-        false
-    }
-}
-
 // Helper function to calculate column info recursively
 fn get_column_indices_helper(
     indices: &mut Vec<(usize, String)>,
@@ -187,20 +161,22 @@ pub struct EquivalentClass<T = Column> {
     /// First element in the EquivalentClass
     head: T,
     /// Other equal columns
-    others: Vec<T>,
+    others: HashSet<T>,
 }
 
-impl<T: PartialEq + Clone> EquivalentClass<T> {
+impl<T: Eq + Hash + Clone> EquivalentClass<T> {
     pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
-        let others = deduplicate_vector(others);
-        EquivalentClass { head, others }
+        EquivalentClass {
+            head,
+            others: HashSet::from_iter(others),
+        }
     }
 
     pub fn head(&self) -> &T {
         &self.head
     }
 
-    pub fn others(&self) -> &[T] {
+    pub fn others(&self) -> &HashSet<T> {
         &self.others
     }
 
@@ -209,24 +185,20 @@ impl<T: PartialEq + Clone> EquivalentClass<T> {
     }
 
     pub fn insert(&mut self, col: T) -> bool {
-        if self.head != col && !self.others.contains(&col) {
-            self.others.push(col);
-            true
-        } else {
-            false
-        }
+        self.head != col && self.others.insert(col)
     }
 
     pub fn remove(&mut self, col: &T) -> bool {
-        let removed = remove_from_vec(&mut self.others, col);
-        // If we are removing the head, shift others so that its first entry becomes the new head.
+        let removed = self.others.remove(col);
+        // If we are removing the head, adjust others so that its first entry becomes the new head.
         if !removed && *col == self.head {
-            let one_col = self.others.first().cloned();
-            if let Some(col) = one_col {
-                let removed = remove_from_vec(&mut self.others, &col);
+            if let Some(col) = self.others.iter().next().cloned() {
+                let removed = self.others.remove(&col);
                 self.head = col;
                 removed
             } else {
+                // We don't allow empty equivalence classes, reject removal if one tries removing
+                // the only element in an equivalence class.
                 false
             }
         } else {
@@ -556,40 +528,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_deduplicate_vector() -> Result<()> {
-        assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]);
-        assert_eq!(
-            deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]),
-            vec![1, 2, 3, 4, 0]
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn test_get_entry_position() -> Result<()> {
-        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2));
-        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0));
-        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None);
-        Ok(())
-    }
-
-    #[test]
-    fn test_remove_from_vec() -> Result<()> {
-        let mut in_data = vec![1, 1, 2, 3, 3];
-        remove_from_vec(&mut in_data, &5);
-        assert_eq!(in_data, vec![1, 1, 2, 3, 3]);
-        remove_from_vec(&mut in_data, &2);
-        assert_eq!(in_data, vec![1, 1, 3, 3]);
-        remove_from_vec(&mut in_data, &2);
-        assert_eq!(in_data, vec![1, 1, 3, 3]);
-        remove_from_vec(&mut in_data, &3);
-        assert_eq!(in_data, vec![1, 1, 3]);
-        remove_from_vec(&mut in_data, &3);
-        assert_eq!(in_data, vec![1, 1]);
-        Ok(())
-    }
-
     #[test]
     fn test_get_column_infos() -> Result<()> {
         let expr1 = Arc::new(Column::new("col1", 2)) as _;
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 994159a8a7..a86b8a62b5 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -19,6 +19,7 @@ mod adapter;
 mod kernels;
 mod kernels_arrow;
 
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 use arrow::array::*;
@@ -96,7 +97,7 @@ use datafusion_expr::type_coercion::binary::{
 use datafusion_expr::{ColumnarValue, Operator};
 
 /// Binary expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct BinaryExpr {
     left: Arc<dyn PhysicalExpr>,
     op: Operator,
@@ -837,6 +838,11 @@ impl PhysicalExpr for BinaryExpr {
         };
         Ok(vec![left, right])
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for BinaryExpr {
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index 903ccda62f..91fa9bbb93 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::borrow::Cow;
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 use crate::expressions::try_cast;
@@ -51,7 +52,7 @@ type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);
 ///     [WHEN ...]
 ///     [ELSE result]
 /// END
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct CaseExpr {
     /// Optional base expression that can be compared to literal values in the "when" expressions
     expr: Option<Arc<dyn PhysicalExpr>>,
@@ -348,6 +349,11 @@ impl PhysicalExpr for CaseExpr {
             )?))
         }
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for CaseExpr {
diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs
index 8e4e1b57e8..b5916def86 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -17,6 +17,7 @@
 
 use std::any::Any;
 use std::fmt;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::intervals::Interval;
@@ -132,6 +133,14 @@ impl PhysicalExpr for CastExpr {
             interval.cast_to(&cast_type, &self.cast_options)?,
         )])
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.expr.hash(&mut s);
+        self.cast_type.hash(&mut s);
+        // Add `self.cast_options` when hash is available
+        // https://github.com/apache/arrow-rs/pull/4395
+    }
 }
 
 impl PartialEq<dyn Any> for CastExpr {
diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs
index eb2be5ef21..9eca9bf713 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -18,6 +18,7 @@
 //! Column expression
 
 use std::any::Any;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::{
@@ -109,6 +110,11 @@ impl PhysicalExpr for Column {
         let col_bounds = context.column_boundaries[self.index].clone();
         context.with_boundaries(col_bounds)
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for Column {
@@ -191,6 +197,11 @@ impl PhysicalExpr for UnKnownColumn {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(self)
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for UnKnownColumn {
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index f1933c1d18..4d0ee5cc7d 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -27,12 +27,13 @@ use datafusion_expr::type_coercion::binary::get_result_type;
 use datafusion_expr::{ColumnarValue, Operator};
 use std::any::Any;
 use std::fmt::{Display, Formatter};
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar};
 
 /// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct DateTimeIntervalExpr {
     lhs: Arc<dyn PhysicalExpr>,
     op: Operator,
@@ -185,6 +186,11 @@ impl PhysicalExpr for DateTimeIntervalExpr {
             children[1].clone(),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for DateTimeIntervalExpr {
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index c07641796a..090cfe5a6e 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -35,10 +35,11 @@ use datafusion_expr::{
 };
 use std::convert::TryInto;
 use std::fmt::Debug;
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 /// expression to get a field of a struct array.
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct GetIndexedFieldExpr {
     arg: Arc<dyn PhysicalExpr>,
     key: ScalarValue,
@@ -153,6 +154,11 @@ impl PhysicalExpr for GetIndexedFieldExpr {
             self.key.clone(),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for GetIndexedFieldExpr {
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs
index 3feb728900..0bcddb4ec8 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -20,6 +20,7 @@
 use ahash::RandomState;
 use std::any::Any;
 use std::fmt::Debug;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::hash_utils::HashValue;
@@ -330,6 +331,14 @@ impl PhysicalExpr for InListExpr {
             self.static_filter.clone(),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.expr.hash(&mut s);
+        self.negated.hash(&mut s);
+        self.list.hash(&mut s);
+        // Add `self.static_filter` when hash is available
+    }
 }
 
 impl PartialEq<dyn Any> for InListExpr {
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs
index 32e53e0c1e..da717a517f 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -17,6 +17,7 @@
 
 //! IS NOT NULL expression
 
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 use crate::physical_expr::down_cast_any_ref;
@@ -31,7 +32,7 @@ use datafusion_common::ScalarValue;
 use datafusion_expr::ColumnarValue;
 
 /// IS NOT NULL expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct IsNotNullExpr {
     /// The input expression
     arg: Arc<dyn PhysicalExpr>,
@@ -91,6 +92,11 @@ impl PhysicalExpr for IsNotNullExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(IsNotNullExpr::new(children[0].clone())))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for IsNotNullExpr {
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs
index 85e111440a..ee7897edd4 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -17,6 +17,7 @@
 
 //! IS NULL expression
 
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 use arrow::compute;
@@ -32,7 +33,7 @@ use datafusion_common::ScalarValue;
 use datafusion_expr::ColumnarValue;
 
 /// IS NULL expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct IsNullExpr {
     /// Input expression
     arg: Arc<dyn PhysicalExpr>,
@@ -92,6 +93,11 @@ impl PhysicalExpr for IsNullExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(IsNullExpr::new(children[0].clone())))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for IsNullExpr {
diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs
index 456e477a1e..f549613acb 100644
--- a/datafusion/physical-expr/src/expressions/like.rs
+++ b/datafusion/physical-expr/src/expressions/like.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
 use arrow::{
@@ -35,7 +36,7 @@ use arrow::compute::kernels::comparison::{
 };
 
 // Like expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct LikeExpr {
     negated: bool,
     case_insensitive: bool,
@@ -186,6 +187,11 @@ impl PhysicalExpr for LikeExpr {
     fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
         context.with_boundaries(None)
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for LikeExpr {
diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs
index 013169ccf7..8cb2bd5b95 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -18,6 +18,7 @@
 //! Literal expressions for physical operations
 
 use std::any::Any;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::{
@@ -32,7 +33,7 @@ use datafusion_common::ScalarValue;
 use datafusion_expr::{ColumnarValue, Expr};
 
 /// Represents a literal value
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Hash)]
 pub struct Literal {
     value: ScalarValue,
 }
@@ -93,6 +94,11 @@ impl PhysicalExpr for Literal {
             Some(1),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for Literal {
diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs
index 0d6aec879e..7f1bd43fec 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -18,6 +18,7 @@
 //! Negation (-) expression
 
 use std::any::Any;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::array::ArrayRef;
@@ -52,7 +53,7 @@ macro_rules! compute_op {
 }
 
 /// Negative expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct NegativeExpr {
     /// Input expression
     arg: Arc<dyn PhysicalExpr>,
@@ -128,6 +129,11 @@ impl PhysicalExpr for NegativeExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(NegativeExpr::new(children[0].clone())))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for NegativeExpr {
diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs
index 7c7d8cc897..584d1d6695 100644
--- a/datafusion/physical-expr/src/expressions/no_op.rs
+++ b/datafusion/physical-expr/src/expressions/no_op.rs
@@ -18,6 +18,7 @@
 //! NoOp placeholder for physical operations
 
 use std::any::Any;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::{
@@ -33,7 +34,7 @@ use datafusion_expr::ColumnarValue;
 /// A place holder expression, can not be evaluated.
 ///
 /// Used in some cases where an `Arc<dyn PhysicalExpr>` is needed, such as `children()`
-#[derive(Debug, PartialEq, Eq, Default)]
+#[derive(Debug, PartialEq, Eq, Default, Hash)]
 pub struct NoOp {}
 
 impl NoOp {
@@ -79,6 +80,11 @@ impl PhysicalExpr for NoOp {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(self)
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for NoOp {
diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs
index bf935aa97e..51cb8235a0 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -19,6 +19,7 @@
 
 use std::any::Any;
 use std::fmt;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
@@ -29,7 +30,7 @@ use datafusion_common::{cast::as_boolean_array, DataFusionError, Result, ScalarV
 use datafusion_expr::ColumnarValue;
 
 /// Not expression
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct NotExpr {
     /// Input expression
     arg: Arc<dyn PhysicalExpr>,
@@ -105,6 +106,11 @@ impl PhysicalExpr for NotExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(NotExpr::new(children[0].clone())))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for NotExpr {
diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs
index bbb29d6fb5..92ffaa1a88 100644
--- a/datafusion/physical-expr/src/expressions/try_cast.rs
+++ b/datafusion/physical-expr/src/expressions/try_cast.rs
@@ -17,6 +17,7 @@
 
 use std::any::Any;
 use std::fmt;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
 
 /// TRY_CAST expression casts an expression to a specific data type and retuns NULL on invalid cast
-#[derive(Debug)]
+#[derive(Debug, Hash)]
 pub struct TryCastExpr {
     /// The expression to cast
     expr: Arc<dyn PhysicalExpr>,
@@ -105,6 +106,11 @@ impl PhysicalExpr for TryCastExpr {
             self.cast_type.clone(),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 impl PartialEq<dyn Any> for TryCastExpr {
diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index d6dd14e8a1..68525920a0 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -33,6 +33,7 @@ use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterato
 
 use crate::intervals::Interval;
 use std::any::Any;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 /// Expression that can be evaluated against a RecordBatch
@@ -104,6 +105,44 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
             "Not implemented for {self}"
         )))
     }
+
+    /// Update the hash `state` with this expression requirements from
+    /// [`Hash`].
+    ///
+    /// This method is required to support hashing [`PhysicalExpr`]s.  To
+    /// implement it, typically the type implementing
+    /// [`PhysicalExpr`] implements [`Hash`] and
+    /// then the following boiler plate is used:
+    ///
+    /// # Example:
+    /// ```
+    /// // User defined expression that derives Hash
+    /// #[derive(Hash, Debug, PartialEq, Eq)]
+    /// struct MyExpr {
+    ///   val: u64
+    /// }
+    ///
+    /// // impl PhysicalExpr {
+    /// // ...
+    /// # impl MyExpr {
+    ///   // Boiler plate to call the derived Hash impl
+    ///   fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+    ///     use std::hash::Hash;
+    ///     let mut s = state;
+    ///     self.hash(&mut s);
+    ///   }
+    /// // }
+    /// # }
+    /// ```
+    /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
+    /// directly because it must remain object safe.
+    fn dyn_hash(&self, _state: &mut dyn Hasher);
+}
+
+impl Hash for dyn PhysicalExpr {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.dyn_hash(state);
+    }
 }
 
 /// Shared [`PhysicalExpr`].
diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs
index da47a55aa9..ef771a6784 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -41,6 +41,7 @@ use datafusion_expr::ScalarFunctionImplementation;
 use std::any::Any;
 use std::fmt::Debug;
 use std::fmt::{self, Formatter};
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 /// Physical expression of a scalar function
@@ -162,6 +163,14 @@ impl PhysicalExpr for ScalarFunctionExpr {
             self.return_type(),
         )))
     }
+
+    fn dyn_hash(&self, state: &mut dyn Hasher) {
+        let mut s = state;
+        self.name.hash(&mut s);
+        self.args.hash(&mut s);
+        self.return_type.hash(&mut s);
+        // Add `self.fun` when hash is available
+    }
 }
 
 impl PartialEq<dyn Any> for ScalarFunctionExpr {
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index df519551d8..659e8c85e8 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -22,6 +22,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 /// Represents Sort operation for a column in a RecordBatch
@@ -39,6 +40,15 @@ impl PartialEq for PhysicalSortExpr {
     }
 }
 
+impl Eq for PhysicalSortExpr {}
+
+impl Hash for PhysicalSortExpr {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.expr.hash(state);
+        self.options.hash(state);
+    }
+}
+
 impl std::fmt::Display for PhysicalSortExpr {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         write!(f, "{} {}", self.expr, to_str(&self.options))