You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/06 11:41:33 UTC

[arrow-datafusion] branch master updated: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b7a33317c [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)
b7a33317c is described below

commit b7a33317c2abf265f4ab6b3fe636f87c4d01334c
Author: mingmwang <mi...@ebay.com>
AuthorDate: Sun Nov 6 19:41:27 2022 +0800

    [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)
    
    * [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement
    
    * Fix hash join output_partitioning
    
    * fix
    
    * fix format
    
    * Resolve review comments
    
    * tiny fix
    
    * UT to verify hash join output_partitioning
    
    * fix comments
---
 datafusion/core/src/dataframe.rs                   | 162 +++++++++++++
 .../core/src/physical_optimizer/merge_exec.rs      |  35 ++-
 .../core/src/physical_plan/aggregates/mod.rs       |  63 ++++-
 datafusion/core/src/physical_plan/analyze.rs       |   8 +-
 .../core/src/physical_plan/coalesce_batches.rs     |   8 +-
 .../core/src/physical_plan/coalesce_partitions.rs  |   8 +-
 datafusion/core/src/physical_plan/empty.rs         |   6 +-
 datafusion/core/src/physical_plan/explain.rs       |   4 -
 .../core/src/physical_plan/file_format/avro.rs     |   4 -
 .../core/src/physical_plan/file_format/csv.rs      |   4 -
 .../core/src/physical_plan/file_format/json.rs     |   4 -
 .../core/src/physical_plan/file_format/parquet.rs  |   4 -
 datafusion/core/src/physical_plan/filter.rs        |  91 +++++++-
 .../core/src/physical_plan/joins/cross_join.rs     |  28 ++-
 .../core/src/physical_plan/joins/hash_join.rs      |  90 ++++++--
 .../src/physical_plan/joins/sort_merge_join.rs     | 147 ++++++++++--
 datafusion/core/src/physical_plan/joins/utils.rs   | 129 ++++++++++-
 datafusion/core/src/physical_plan/limit.rs         |  24 +-
 datafusion/core/src/physical_plan/memory.rs        |   4 -
 datafusion/core/src/physical_plan/mod.rs           |  59 ++++-
 datafusion/core/src/physical_plan/planner.rs       |  40 ++--
 datafusion/core/src/physical_plan/projection.rs    |  80 ++++++-
 datafusion/core/src/physical_plan/repartition.rs   |  12 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    |  18 +-
 .../physical_plan/sorts/sort_preserving_merge.rs   |  13 +-
 datafusion/core/src/physical_plan/union.rs         | 179 ++++++++++++--
 datafusion/core/src/physical_plan/values.rs        |  13 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |   4 +
 .../src/physical_plan/windows/window_agg_exec.rs   |  39 +++-
 .../core/src/scheduler/pipeline/execution.rs       |   4 +-
 datafusion/core/tests/user_defined_plan.rs         |   8 +-
 datafusion/physical-expr/src/equivalence.rs        | 256 +++++++++++++++++++++
 datafusion/physical-expr/src/expressions/column.rs |  70 ++++++
 datafusion/physical-expr/src/expressions/mod.rs    |   2 +-
 datafusion/physical-expr/src/lib.rs                |   9 +
 datafusion/physical-expr/src/utils.rs              | 142 +++++++++++-
 36 files changed, 1537 insertions(+), 234 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index aa5958fe6..3d1c8d009 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -842,6 +842,8 @@ mod tests {
     use super::*;
     use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
     use crate::physical_plan::ColumnarValue;
+    use crate::physical_plan::Partitioning;
+    use crate::physical_plan::PhysicalExpr;
     use crate::test_util;
     use crate::test_util::parquet_test_data;
     use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
@@ -851,6 +853,7 @@ mod tests {
         avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
         BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction,
     };
+    use datafusion_physical_expr::expressions::Column;
 
     #[tokio::test]
     async fn select_columns() -> Result<()> {
@@ -1515,4 +1518,163 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =
+            left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+        let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+
+        let union = join1.union(join2)?;
+
+        let union_rows = union.collect().await?;
+
+        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+        let physical_plan = union.create_physical_plan().await?;
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+
+        // For partition aware union, the output partition count should not be changed.
+        assert_eq!(
+            physical_plan.output_partitioning().partition_count(),
+            default_partition_count
+        );
+        // For partition aware union, the output partition is the same with the union's inputs
+        for child in physical_plan.children() {
+            assert_eq!(
+                physical_plan.output_partitioning(),
+                child.output_partitioning()
+            );
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn non_partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 = left.join(
+            right.clone(),
+            JoinType::Inner,
+            &["c1", "c2"],
+            &["c2_c1", "c2_c2"],
+            None,
+        )?;
+
+        // join key ordering is different
+        let join2 = left.join(
+            right,
+            JoinType::Inner,
+            &["c2", "c1"],
+            &["c2_c2", "c2_c1"],
+            None,
+        )?;
+
+        let union = join1.union(join2)?;
+
+        let union_rows = union.collect().await?;
+
+        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+        let physical_plan = union.create_physical_plan().await?;
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+
+        // For non-partition aware union, the output partitioning count should be the combination of all output partitions count
+        assert!(matches!(
+            physical_plan.output_partitioning(),
+            Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn verify_join_output_partitioning() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let all_join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::RightSemi,
+            JoinType::LeftAnti,
+            JoinType::RightAnti,
+        ];
+
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+
+        for join_type in all_join_types {
+            let join = left.join(
+                right.clone(),
+                join_type,
+                &["c1", "c2"],
+                &["c2_c1", "c2_c2"],
+                None,
+            )?;
+            let physical_plan = join.create_physical_plan().await?;
+            let out_partitioning = physical_plan.output_partitioning();
+            let join_schema = physical_plan.schema();
+
+            match join_type {
+                JoinType::Inner
+                | JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
+                    let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+                        Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
+                        Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
+                    ];
+                    assert_eq!(
+                        out_partitioning,
+                        Partitioning::Hash(left_exprs, default_partition_count)
+                    );
+                }
+                JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
+                    let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+                        Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()),
+                        Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()),
+                    ];
+                    assert_eq!(
+                        out_partitioning,
+                        Partitioning::Hash(right_exprs, default_partition_count)
+                    );
+                }
+                JoinType::Full => {
+                    assert!(matches!(
+                        out_partitioning,
+                    Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
+                }
+            }
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
index 960d0582f..1ed0f6196 100644
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/core/src/physical_optimizer/merge_exec.rs
@@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
                 .iter()
                 .map(|child| self.optimize(child.clone(), _config))
                 .collect::<Result<Vec<_>>>()?;
-            match plan.required_child_distribution() {
-                Distribution::UnspecifiedDistribution => {
-                    with_new_children_if_necessary(plan, children)
-                }
-                Distribution::HashPartitioned(_) => {
-                    with_new_children_if_necessary(plan, children)
-                }
-                Distribution::SinglePartition => with_new_children_if_necessary(
-                    plan,
-                    children
-                        .iter()
-                        .map(|child| {
-                            if child.output_partitioning().partition_count() == 1 {
-                                child.clone()
-                            } else {
-                                Arc::new(CoalescePartitionsExec::new(child.clone()))
-                            }
-                        })
-                        .collect(),
-                ),
-            }
+            assert_eq!(children.len(), plan.required_input_distribution().len());
+            let new_children = children
+                .into_iter()
+                .zip(plan.required_input_distribution())
+                .map(|(child, dist)| match dist {
+                    Distribution::SinglePartition
+                        if child.output_partitioning().partition_count() > 1 =>
+                    {
+                        Arc::new(CoalescePartitionsExec::new(child.clone()))
+                    }
+                    _ => child,
+                })
+                .collect::<Vec<_>>();
+            with_new_children_if_necessary(plan, new_children)
         }
     }
 
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index e453b65cc..6fda8f77b 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -37,6 +37,7 @@ use datafusion_physical_expr::{
     expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
 };
 use std::any::Any;
+use std::collections::HashMap;
 
 use std::sync::Arc;
 
@@ -45,9 +46,11 @@ mod no_grouping;
 mod row_hash;
 
 use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
+use crate::physical_plan::EquivalenceProperties;
 pub use datafusion_expr::AggregateFunction;
 use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
+use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
 use datafusion_row::{row_supported, RowType};
 
 /// Hash aggregate modes
@@ -163,6 +166,9 @@ pub struct AggregateExec {
     /// same as input.schema() but for the final aggregate it will be the same as the input
     /// to the partial aggregate
     input_schema: SchemaRef,
+    /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
+    /// The key is the column from the input schema and the values are the columns from the output schema
+    alias_map: HashMap<Column, Vec<Column>>,
     /// Execution Metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -186,6 +192,18 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+        for (expression, name) in group_by.expr.iter() {
+            if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+                let new_col_idx = schema.index_of(name)?;
+                // When the column name is the same, but index does not equal, treat it as Alias
+                if (column.name() != name) || (column.index() != new_col_idx) {
+                    let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
+                    entry.push(Column::new(name, new_col_idx));
+                }
+            };
+        }
+
         Ok(AggregateExec {
             mode,
             group_by,
@@ -193,6 +211,7 @@ impl AggregateExec {
             input,
             schema,
             input_schema,
+            alias_map,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        match &self.mode {
+            AggregateMode::Partial => {
+                // Partial Aggregation will not change the output partitioning but need to respect the Alias
+                let input_partition = self.input.output_partitioning();
+                match input_partition {
+                    Partitioning::Hash(exprs, part) => {
+                        let normalized_exprs = exprs
+                            .into_iter()
+                            .map(|expr| {
+                                normalize_out_expr_with_alias_schema(
+                                    expr,
+                                    &self.alias_map,
+                                    &self.schema,
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        Partitioning::Hash(normalized_exprs, part)
+                    }
+                    _ => input_partition,
+                }
+            }
+            // Final Aggregation's output partitioning is the same as its real input
+            _ => self.input.output_partitioning(),
+        }
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
 
-    fn required_child_distribution(&self) -> Distribution {
+    fn required_input_distribution(&self) -> Vec<Distribution> {
         match &self.mode {
-            AggregateMode::Partial => Distribution::UnspecifiedDistribution,
-            AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
-                self.group_by.expr.iter().map(|x| x.0.clone()).collect(),
-            ),
-            AggregateMode::Final => Distribution::SinglePartition,
+            AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
+            AggregateMode::FinalPartitioned => {
+                vec![Distribution::HashPartitioned(self.output_group_expr())]
+            }
+            AggregateMode::Final => vec![Distribution::SinglePartition],
         }
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        let mut input_equivalence_properties = self.input.equivalence_properties();
+        input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
+        input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
+        input_equivalence_properties
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 8134ee7d2..b0578bd48 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec {
     }
 
     /// Specifies we want the input as a single stream
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition]
     }
 
     /// Get the output partitioning of this plan
@@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         mut children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 317500ddc..e7c492732 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -25,8 +25,8 @@ use std::task::{Context, Poll};
 
 use crate::error::Result;
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream,
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream,
 };
 
 use crate::execution::context::TaskContext;
@@ -100,8 +100,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
     }
 
     fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index d1c797eac..816a9c940 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -33,7 +33,9 @@ use super::expressions::PhysicalSortExpr;
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, Statistics};
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+};
 
 use super::SendableRecordBatchStream;
 use crate::execution::context::TaskContext;
@@ -87,8 +89,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
     }
 
     fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index c693764c8..4751dade1 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning,
 };
 use arrow::array::NullArray;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -98,10 +98,6 @@ impl ExecutionPlan for EmptyExec {
         vec![]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
-    }
-
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(self.partitions)
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 15f459fb0..ac350b183 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -97,10 +97,6 @@ impl ExecutionPlan for ExplainExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 2aab84fad..38178a976 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -76,10 +76,6 @@ impl ExecutionPlan for AvroExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         Vec::new()
     }
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index d086a7798..51180c0f0 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -109,10 +109,6 @@ impl ExecutionPlan for CsvExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index c8c5d71bd..ceb9e7958 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -91,10 +91,6 @@ impl ExecutionPlan for NdJsonExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         Vec::new()
     }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index c803dabd0..1f98dd88c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -251,10 +251,6 @@ impl ExecutionPlan for ParquetExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index b4e3edaee..17a2355d0 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -28,13 +28,17 @@ use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
     metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
-    DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+    Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    PhysicalExpr,
 };
 use arrow::array::BooleanArray;
 use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+use datafusion_expr::Operator;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::split_conjunction;
 
 use log::debug;
 
@@ -113,8 +117,14 @@ impl ExecutionPlan for FilterExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        // Combine the equal predicates with the input equivalence properties
+        let mut input_properties = self.input.equivalence_properties();
+        let (equal_pairs, _ne_pairs) = collect_columns_from_predicate(&self.predicate);
+        for new_condition in equal_pairs {
+            input_properties.add_equal_conditions(new_condition)
+        }
+        input_properties
     }
 
     fn with_new_children(
@@ -231,6 +241,38 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
+    let mut eq_predicate_columns: Vec<(&Column, &Column)> = Vec::new();
+    let mut ne_predicate_columns: Vec<(&Column, &Column)> = Vec::new();
+
+    let predicates = split_conjunction(predicate);
+    predicates.into_iter().for_each(|p| {
+        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
+            let left = binary.left();
+            let right = binary.right();
+            if left.as_any().is::<Column>() && right.as_any().is::<Column>() {
+                let left_column = left.as_any().downcast_ref::<Column>().unwrap();
+                let right_column = right.as_any().downcast_ref::<Column>().unwrap();
+                match binary.op() {
+                    Operator::Eq => {
+                        eq_predicate_columns.push((left_column, right_column))
+                    }
+                    Operator::NotEq => {
+                        ne_predicate_columns.push((left_column, right_column))
+                    }
+                    _ => {}
+                }
+            }
+        }
+    });
+
+    (eq_predicate_columns, ne_predicate_columns)
+}
+/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
+pub type EqualAndNonEqual<'a> =
+    (Vec<(&'a Column, &'a Column)>, Vec<(&'a Column, &'a Column)>);
+
 #[cfg(test)]
 mod tests {
 
@@ -295,4 +337,47 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn collect_columns_predicates() -> Result<()> {
+        let schema = test_util::aggr_test_schema();
+        let predicate: Arc<dyn PhysicalExpr> = binary(
+            binary(
+                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
+                Operator::And,
+                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
+                &schema,
+            )?,
+            Operator::And,
+            binary(
+                binary(
+                    col("c2", &schema)?,
+                    Operator::Eq,
+                    col("c9", &schema)?,
+                    &schema,
+                )?,
+                Operator::And,
+                binary(
+                    col("c1", &schema)?,
+                    Operator::NotEq,
+                    col("c13", &schema)?,
+                    &schema,
+                )?,
+                &schema,
+            )?,
+            &schema,
+        )?;
+
+        let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
+
+        assert_eq!(1, equal_pairs.len());
+        assert_eq!(equal_pairs[0].0.name(), "c2");
+        assert_eq!(equal_pairs[0].1.name(), "c9");
+
+        assert_eq!(1, ne_pairs.len());
+        assert_eq!(ne_pairs[0].0.name(), "c1");
+        assert_eq!(ne_pairs[0].1.name(), "c13");
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 7a35116a4..a71e06cce 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -29,16 +29,19 @@ use arrow::record_batch::RecordBatch;
 use crate::execution::context::TaskContext;
 use crate::physical_plan::{
     coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
-    ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
+    Partitioning, PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream,
+    Statistics,
 };
 use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
-use datafusion_physical_expr::PhysicalSortExpr;
 use log::debug;
 use std::time::Instant;
 
-use super::utils::{check_join_is_valid, OnceAsync, OnceFut};
+use super::utils::{
+    adjust_right_output_partitioning, check_join_is_valid,
+    cross_join_equivalence_properties, OnceAsync, OnceFut,
+};
 
 /// Data of the left side
 type JoinLeftData = RecordBatch;
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
         )?))
     }
 
+    // TODO optimize CrossJoin implementation to generate M * N partitions
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        adjust_right_output_partitioning(
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
+    // TODO check the output ordering of CrossJoin
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        let left_columns_len = self.left.schema().fields.len();
+        cross_join_equivalence_properties(
+            self.left.equivalence_properties(),
+            self.right.equivalence_properties(),
+            left_columns_len,
+        )
     }
 
     fn execute(
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 839a24112..07b4d6f4a 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -62,12 +62,13 @@ use crate::physical_plan::{
     expressions::PhysicalSortExpr,
     hash_utils::create_hashes,
     joins::utils::{
-        build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
-        JoinFilter, JoinOn, JoinSide,
+        adjust_right_output_partitioning, build_join_schema, check_join_is_valid,
+        combine_join_equivalence_properties, estimate_join_statistics,
+        partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, JoinSide,
     },
     metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-    DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
+    PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 
 use crate::error::{DataFusionError, Result};
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        match self.mode {
+            PartitionMode::CollectLeft => vec![
+                Distribution::SinglePartition,
+                Distribution::UnspecifiedDistribution,
+            ],
+            PartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| {
+                        (
+                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                        )
+                    })
+                    .unzip();
+                vec![
+                    Distribution::HashPartitioned(left_expr),
+                    Distribution::HashPartitioned(right_expr),
+                ]
+            }
+        }
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        let left_columns_len = self.left.schema().fields.len();
+        match self.mode {
+            PartitionMode::CollectLeft => match self.join_type {
+                JoinType::Inner | JoinType::Right => adjust_right_output_partitioning(
+                    self.right.output_partitioning(),
+                    left_columns_len,
+                ),
+                JoinType::RightSemi | JoinType::RightAnti => {
+                    self.right.output_partitioning()
+                }
+                JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti
+                | JoinType::Full => Partitioning::UnknownPartitioning(
+                    self.right.output_partitioning().partition_count(),
+                ),
+            },
+            PartitionMode::Partitioned => partitioned_join_output_partitioning(
+                self.join_type,
+                self.left.output_partitioning(),
+                self.right.output_partitioning(),
+                left_columns_len,
+            ),
+        }
+    }
+
+    // TODO Output ordering might be kept for some cases.
+    // For example if it is inner join then the stream side order can be kept
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        let left_columns_len = self.left.schema().fields.len();
+        combine_join_equivalence_properties(
+            self.join_type,
+            self.left.equivalence_properties(),
+            self.right.equivalence_properties(),
+            left_columns_len,
+            self.on(),
+        )
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.left.clone(), self.right.clone()]
     }
@@ -289,18 +359,6 @@ impl ExecutionPlan for HashJoinExec {
         )?))
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn execute(
         &self,
         partition: usize,
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 92392455e..44771ba4c 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -43,14 +43,17 @@ use crate::physical_plan::common::combine_batches;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::joins::utils::{
-    build_join_schema, check_join_is_valid, JoinOn,
+    build_join_schema, check_join_is_valid, combine_join_equivalence_properties,
+    partitioned_join_output_partitioning, JoinOn,
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use crate::physical_plan::{
-    metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
+    Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
+
 /// join execution plan executes partitions in parallel and combines them into a set of
 /// partitions.
 #[derive(Debug)]
@@ -67,6 +70,12 @@ pub struct SortMergeJoinExec {
     schema: SchemaRef,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// The left SortExpr
+    left_sort_exprs: Vec<PhysicalSortExpr>,
+    /// The right SortExpr
+    right_sort_exprs: Vec<PhysicalSortExpr>,
+    /// The output ordering
+    output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Sort options of join columns used in sorting left and right execution plans
     sort_options: Vec<SortOptions>,
     /// If null_equals_null is true, null == null else null != null
@@ -104,6 +113,75 @@ impl SortMergeJoinExec {
             )));
         }
 
+        let (left_expr, right_expr): (Vec<_>, Vec<_>) = on
+            .iter()
+            .map(|(l, r)| {
+                (
+                    Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                    Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                )
+            })
+            .unzip();
+
+        let left_sort_exprs = left_expr
+            .into_iter()
+            .zip(sort_options.iter())
+            .map(|(k, sort_op)| PhysicalSortExpr {
+                expr: k,
+                options: *sort_op,
+            })
+            .collect::<Vec<_>>();
+
+        let right_sort_exprs = right_expr
+            .into_iter()
+            .zip(sort_options.iter())
+            .map(|(k, sort_op)| PhysicalSortExpr {
+                expr: k,
+                options: *sort_op,
+            })
+            .collect::<Vec<_>>();
+
+        let output_ordering = match join_type {
+            JoinType::Inner
+            | JoinType::Left
+            | JoinType::LeftSemi
+            | JoinType::LeftAnti => {
+                left.output_ordering().map(|sort_exprs| sort_exprs.to_vec())
+            }
+            JoinType::RightSemi | JoinType::RightAnti => right
+                .output_ordering()
+                .map(|sort_exprs| sort_exprs.to_vec()),
+            JoinType::Right => {
+                let left_columns_len = left.schema().fields.len();
+                right.output_ordering().map(|sort_exprs| {
+                    sort_exprs
+                        .iter()
+                        .map(|e| {
+                            let new_expr = e
+                                .expr
+                                .clone()
+                                .transform_down(&|e| match e
+                                    .as_any()
+                                    .downcast_ref::<Column>()
+                                {
+                                    Some(col) => Some(Arc::new(Column::new(
+                                        col.name(),
+                                        left_columns_len + col.index(),
+                                    ))),
+                                    None => None,
+                                })
+                                .unwrap();
+                            PhysicalSortExpr {
+                                expr: new_expr,
+                                options: e.options,
+                            }
+                        })
+                        .collect::<Vec<_>>()
+                })
+            }
+            JoinType::Full => None,
+        };
+
         let schema =
             Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0);
 
@@ -114,10 +192,18 @@ impl SortMergeJoinExec {
             join_type,
             schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            left_sort_exprs,
+            right_sort_exprs,
+            output_ordering,
             sort_options,
             null_equals_null,
         })
     }
+
+    /// Set of common columns used to join on
+    pub fn on(&self) -> &[(Column, Column)] {
+        &self.on
+    }
 }
 
 impl ExecutionPlan for SortMergeJoinExec {
@@ -129,25 +215,50 @@ impl ExecutionPlan for SortMergeJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        let (left_expr, right_expr) = self
+            .on
+            .iter()
+            .map(|(l, r)| {
+                (
+                    Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                    Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                )
+            })
+            .unzip();
+        vec![
+            Distribution::HashPartitioned(left_expr),
+            Distribution::HashPartitioned(right_expr),
+        ]
+    }
+
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
+    }
+
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        partitioned_join_output_partitioning(
+            self.join_type,
+            self.left.output_partitioning(),
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        match self.join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => self.left.output_ordering(),
-            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
-                self.right.output_ordering()
-            }
-            JoinType::Full => None,
-        }
+        self.output_ordering.as_deref()
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        let left_columns_len = self.left.schema().fields.len();
+        combine_join_equivalence_properties(
+            self.join_type,
+            self.left.equivalence_properties(),
+            self.right.equivalence_properties(),
+            left_columns_len,
+            self.on(),
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -228,8 +339,8 @@ impl ExecutionPlan for SortMergeJoinExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "SortMergeJoin: join_type={:?}, on={:?}, schema={:?}",
-                    self.join_type, self.on, &self.schema
+                    "SortMergeJoin: join_type={:?}, on={:?}",
+                    self.join_type, self.on
                 )
             }
         }
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs
index d041e7dfb..905e59de9 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -23,7 +23,7 @@ use crate::physical_plan::expressions::Column;
 use arrow::datatypes::{Field, Schema};
 use arrow::error::ArrowError;
 use datafusion_common::ScalarValue;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};
 use futures::future::{BoxFuture, Shared};
 use futures::{ready, FutureExt};
 use parking_lot::Mutex;
@@ -33,7 +33,10 @@ use std::future::Future;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{
+    ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics,
+};
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
 
 /// The on clause of the join, as vector of (left, right) columns.
 pub type JoinOn = Vec<(Column, Column)>;
@@ -83,6 +86,128 @@ fn check_join_set_is_valid(
     Ok(())
 }
 
+/// Calculate the OutputPartitioning for Partitioned Join
+pub fn partitioned_join_output_partitioning(
+    join_type: JoinType,
+    left_partitioning: Partitioning,
+    right_partitioning: Partitioning,
+    left_columns_len: usize,
+) -> Partitioning {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
+            left_partitioning
+        }
+        JoinType::RightSemi | JoinType::RightAnti => right_partitioning,
+        JoinType::Right => {
+            adjust_right_output_partitioning(right_partitioning, left_columns_len)
+        }
+        JoinType::Full => {
+            Partitioning::UnknownPartitioning(right_partitioning.partition_count())
+        }
+    }
+}
+
+/// Adjust the right out partitioning to new Column Index
+pub fn adjust_right_output_partitioning(
+    right_partitioning: Partitioning,
+    left_columns_len: usize,
+) -> Partitioning {
+    match right_partitioning {
+        Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
+        Partitioning::UnknownPartitioning(size) => {
+            Partitioning::UnknownPartitioning(size)
+        }
+        Partitioning::Hash(exprs, size) => {
+            let new_exprs = exprs
+                .into_iter()
+                .map(|expr| {
+                    expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+                        Some(col) => Some(Arc::new(Column::new(
+                            col.name(),
+                            left_columns_len + col.index(),
+                        ))),
+                        None => None,
+                    })
+                    .unwrap()
+                })
+                .collect::<Vec<_>>();
+            Partitioning::Hash(new_exprs, size)
+        }
+    }
+}
+
+/// Combine the Equivalence Properties for Join Node
+pub fn combine_join_equivalence_properties(
+    join_type: JoinType,
+    left_properties: EquivalenceProperties,
+    right_properties: EquivalenceProperties,
+    left_columns_len: usize,
+    on: &[(Column, Column)],
+) -> EquivalenceProperties {
+    let mut new_properties = match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
+            let mut left_properties = left_properties;
+            let new_right_properties = right_properties
+                .classes()
+                .iter()
+                .map(|prop| {
+                    let new_head = Column::new(
+                        prop.head().name(),
+                        left_columns_len + prop.head().index(),
+                    );
+                    let new_others = prop
+                        .others()
+                        .iter()
+                        .map(|col| {
+                            Column::new(col.name(), left_columns_len + col.index())
+                        })
+                        .collect::<Vec<_>>();
+                    EquivalentClass::new(new_head, new_others)
+                })
+                .collect::<Vec<_>>();
+
+            left_properties.extend(new_right_properties);
+            left_properties
+        }
+        JoinType::LeftSemi | JoinType::LeftAnti => left_properties,
+        JoinType::RightSemi | JoinType::RightAnti => right_properties,
+    };
+
+    if join_type == JoinType::Inner {
+        on.iter().for_each(|(column1, column2)| {
+            let new_column2 =
+                Column::new(column2.name(), left_columns_len + column2.index());
+            new_properties.add_equal_conditions((column1, &new_column2))
+        })
+    }
+    new_properties
+}
+
+/// Calculate the Equivalence Properties for CrossJoin Node
+pub fn cross_join_equivalence_properties(
+    left_properties: EquivalenceProperties,
+    right_properties: EquivalenceProperties,
+    left_columns_len: usize,
+) -> EquivalenceProperties {
+    let mut left_properties = left_properties;
+    let new_right_properties = right_properties
+        .classes()
+        .iter()
+        .map(|prop| {
+            let new_head =
+                Column::new(prop.head().name(), left_columns_len + prop.head().index());
+            let new_others = prop
+                .others()
+                .iter()
+                .map(|col| Column::new(col.name(), left_columns_len + col.index()))
+                .collect::<Vec<_>>();
+            EquivalentClass::new(new_head, new_others)
+        })
+        .collect::<Vec<_>>();
+    left_properties.extend(new_right_properties);
+    left_properties
+}
+
 /// Used in ColumnIndex to distinguish which side the index is for
 #[derive(Debug, Clone)]
 pub enum JoinSide {
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 322c21ff4..171e2b2f0 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -27,7 +27,7 @@ use std::task::{Context, Poll};
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
 };
 use arrow::array::ArrayRef;
 use arrow::compute::limit;
@@ -98,10 +98,9 @@ impl ExecutionPlan for GlobalLimitExec {
         vec![self.input.clone()]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition]
     }
-
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
@@ -123,6 +122,10 @@ impl ExecutionPlan for GlobalLimitExec {
         self.input.output_ordering()
     }
 
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -281,14 +284,13 @@ impl ExecutionPlan for LocalLimitExec {
         false
     }
 
-    // Local limit does not make any attempt to maintain the input
-    // sortedness (if there is more than one partition)
+    // Local limit will not change the input plan's ordering
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        if self.output_partitioning().partition_count() == 1 {
-            self.input.output_ordering()
-        } else {
-            None
-        }
+        self.input.output_ordering()
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
     }
 
     fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 698eaf12a..7753a5ba7 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -81,10 +81,6 @@ impl ExecutionPlan for MemoryExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 55b46c991..87df26781 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
-    /// Specifies the data distribution requirements of all the
-    /// children for this operator
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    /// Specifies the data distribution requirements for all the
+    /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if !self.children().is_empty() {
+            vec![Distribution::UnspecifiedDistribution; self.children().len()]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    /// Specifies the ordering requirements for all the
+    /// children for this operator.
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        vec![None; self.children().len()]
     }
 
     /// Returns `true` if this operator relies on its inputs being
@@ -136,13 +146,17 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// optimizations which might reorder the inputs (such as
     /// repartitioning to increase concurrency).
     ///
-    /// The default implementation returns `true`
+    /// The default implementation checks the input ordering requirements
+    /// and if there is non empty ordering requirements to the input, the method will
+    /// return `true`.
     ///
     /// WARNING: if you override this default and return `false`, your
     /// operator can not rely on DataFusion preserving the input order
     /// as it will likely not.
     fn relies_on_input_order(&self) -> bool {
-        true
+        self.required_input_ordering()
+            .iter()
+            .any(|ordering| matches!(ordering, Some(_)))
     }
 
     /// Returns `false` if this operator's implementation may reorder
@@ -175,10 +189,15 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn benefits_from_input_partitioning(&self) -> bool {
         // By default try to maximize parallelism with more CPUs if
         // possible
-        !matches!(
-            self.required_child_distribution(),
-            Distribution::SinglePartition
-        )
+        !self
+            .required_input_distribution()
+            .into_iter()
+            .any(|dist| matches!(dist, Distribution::SinglePartition))
+    }
+
+    /// Get the EquivalenceProperties within the plan
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        EquivalenceProperties::new()
     }
 
     /// Get a list of child execution plans that provide the input for this plan. The returned list
@@ -460,6 +479,23 @@ impl Partitioning {
     }
 }
 
+impl PartialEq for Partitioning {
+    fn eq(&self, other: &Partitioning) -> bool {
+        match (self, other) {
+            (
+                Partitioning::RoundRobinBatch(count1),
+                Partitioning::RoundRobinBatch(count2),
+            ) if count1 == count2 => true,
+            (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
+                if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) =>
+            {
+                true
+            }
+            _ => false,
+        }
+    }
+}
+
 /// Distribution schemes
 #[derive(Debug, Clone)]
 pub enum Distribution {
@@ -472,7 +508,10 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
+use datafusion_physical_expr::expr_list_eq_strict_order;
+use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::EquivalenceProperties;
 pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
 
 /// Applies an optional projection to a [`SchemaRef`], returning the
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index a0fbba2f8..88a43e111 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -522,8 +522,9 @@ impl DefaultPhysicalPlanner {
                         && session_state.config.target_partitions > 1
                         && session_state.config.repartition_windows;
 
-                    let input_exec = if can_repartition {
-                        let partition_keys = partition_keys
+                    let physical_partition_keys = if can_repartition
+                    {
+                        partition_keys
                             .iter()
                             .map(|e| {
                                 self.create_physical_expr(
@@ -533,11 +534,16 @@ impl DefaultPhysicalPlanner {
                                     session_state,
                                 )
                             })
-                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+                    } else {
+                        vec![]
+                    };
+
+                    let input_exec = if can_repartition {
                         Arc::new(RepartitionExec::try_new(
                             input_exec,
                             Partitioning::Hash(
-                                partition_keys,
+                                physical_partition_keys.clone(),
                                 session_state.config.target_partitions,
                             ),
                         )?)
@@ -576,8 +582,8 @@ impl DefaultPhysicalPlanner {
 
                     let logical_input_schema = input.schema();
 
-                    let input_exec = if sort_keys.is_empty() {
-                        input_exec
+                    let physical_sort_keys = if sort_keys.is_empty() {
+                        None
                     } else {
                         let physical_input_schema = input_exec.schema();
                         let sort_keys = sort_keys
@@ -600,13 +606,19 @@ impl DefaultPhysicalPlanner {
                                 _ => unreachable!(),
                             })
                             .collect::<Result<Vec<_>>>()?;
-                        Arc::new(if can_repartition {
-                            SortExec::new_with_partitioning(sort_keys, input_exec, true, None)
-                        } else {
-                            SortExec::try_new(sort_keys, input_exec, None)?
-                        })
+                        Some(sort_keys)
                     };
 
+                    let input_exec = match physical_sort_keys.clone() {
+                        None => input_exec,
+                        Some(sort_exprs) => {
+                            if can_repartition {
+                                Arc::new(SortExec::new_with_partitioning(sort_exprs, input_exec, true, None))
+                            } else {
+                                Arc::new(SortExec::try_new(sort_exprs, input_exec, None)?)
+                            }
+                        },
+                    };
                     let physical_input_schema = input_exec.schema();
                     let window_expr = window_expr
                         .iter()
@@ -624,6 +636,8 @@ impl DefaultPhysicalPlanner {
                         window_expr,
                         input_exec,
                         physical_input_schema,
+                        physical_partition_keys,
+                        physical_sort_keys,
                     )?))
                 }
                 LogicalPlan::Aggregate(Aggregate {
@@ -2308,10 +2322,6 @@ mod tests {
             None
         }
 
-        fn relies_on_input_order(&self) -> bool {
-            false
-        }
-
         fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
             vec![]
         }
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 5fa3c93cd..2b6297f8c 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -21,14 +21,15 @@
 //! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
 
 use std::any::Any;
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashMap};
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::error::Result;
 use crate::physical_plan::{
-    ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+    ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
+    Partitioning, PhysicalExpr,
 };
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -39,6 +40,7 @@ use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use crate::execution::context::TaskContext;
+use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
 use futures::stream::Stream;
 use futures::stream::StreamExt;
 
@@ -51,6 +53,11 @@ pub struct ProjectionExec {
     schema: SchemaRef,
     /// The input plan
     input: Arc<dyn ExecutionPlan>,
+    /// The output ordering
+    output_ordering: Option<Vec<PhysicalSortExpr>>,
+    /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
+    /// The key is the column from the input schema and the values are the columns from the output schema
+    alias_map: HashMap<Column, Vec<Column>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -82,10 +89,47 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+        for (expression, name) in expr.iter() {
+            if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+                let new_col_idx = schema.index_of(name)?;
+                // When the column name is the same, but index does not equal, treat it as Alias
+                if (column.name() != name) || (column.index() != new_col_idx) {
+                    let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
+                    entry.push(Column::new(name, new_col_idx));
+                }
+            };
+        }
+
+        // Output Ordering need to respect the alias
+        let child_output_ordering = input.output_ordering();
+        let output_ordering = match child_output_ordering {
+            Some(sort_exprs) => {
+                let normalized_exprs = sort_exprs
+                    .iter()
+                    .map(|sort_expr| {
+                        let expr = normalize_out_expr_with_alias_schema(
+                            sort_expr.expr.clone(),
+                            &alias_map,
+                            &schema,
+                        );
+                        PhysicalSortExpr {
+                            expr,
+                            options: sort_expr.options,
+                        }
+                    })
+                    .collect::<Vec<_>>();
+                Some(normalized_exprs)
+            }
+            None => None,
+        };
+
         Ok(Self {
             expr,
             schema,
             input: input.clone(),
+            output_ordering,
+            alias_map,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -118,11 +162,28 @@ impl ExecutionPlan for ProjectionExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        // Output partition need to respect the alias
+        let input_partition = self.input.output_partitioning();
+        match input_partition {
+            Partitioning::Hash(exprs, part) => {
+                let normalized_exprs = exprs
+                    .into_iter()
+                    .map(|expr| {
+                        normalize_out_expr_with_alias_schema(
+                            expr,
+                            &self.alias_map,
+                            &self.schema,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                Partitioning::Hash(normalized_exprs, part)
+            }
+            _ => input_partition,
+        }
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
+        self.output_ordering.as_deref()
     }
 
     fn maintains_input_order(&self) -> bool {
@@ -130,8 +191,15 @@ impl ExecutionPlan for ProjectionExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
+    // Equivalence properties need to be adjusted after the Projection.
+    // 1) Add Alias, Alias can introduce additional equivalence properties,
+    //    For example:  Projection(a, a as a1, a as a2)
+    // 2) Truncate the properties that are not in the schema of the Projection
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        let mut input_equivalence_properties = self.input.equivalence_properties();
+        input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
+        input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
+        input_equivalence_properties
     }
 
     fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 4c057b1d6..3e50ec645 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -25,7 +25,9 @@ use std::{any::Any, vec};
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics,
+};
 use arrow::array::{ArrayRef, UInt64Builder};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
@@ -272,10 +274,6 @@ impl ExecutionPlan for RepartitionExec {
         vec![self.input.clone()]
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -294,6 +292,10 @@ impl ExecutionPlan for RepartitionExec {
         None
     }
 
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
     fn execute(
         &self,
         partition: usize,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index cd14f27b6..fb289b68b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -46,6 +46,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use datafusion_physical_expr::EquivalenceProperties;
 use futures::lock::Mutex;
 use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
 use log::{debug, error};
@@ -743,11 +744,13 @@ impl ExecutionPlan for SortExec {
         }
     }
 
-    fn required_child_distribution(&self) -> Distribution {
+    fn required_input_distribution(&self) -> Vec<Distribution> {
         if self.preserve_partitioning {
-            Distribution::UnspecifiedDistribution
+            vec![Distribution::UnspecifiedDistribution]
         } else {
-            Distribution::SinglePartition
+            // global sort
+            // TODO support RangePartition and OrderedDistribution
+            vec![Distribution::SinglePartition]
         }
     }
 
@@ -755,11 +758,6 @@ impl ExecutionPlan for SortExec {
         vec![self.input.clone()]
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        // this operator resorts everything
-        false
-    }
-
     fn benefits_from_input_partitioning(&self) -> bool {
         false
     }
@@ -768,6 +766,10 @@ impl ExecutionPlan for SortExec {
         Some(&self.expr)
     }
 
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 17dea38c0..2fe5bc313 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -49,6 +49,7 @@ use crate::physical_plan::{
     Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
+use datafusion_physical_expr::EquivalenceProperties;
 
 /// Sort preserving merge execution plan
 ///
@@ -123,18 +124,22 @@ impl ExecutionPlan for SortPreservingMergeExec {
         Partitioning::UnknownPartitioning(1)
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::UnspecifiedDistribution]
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        vec![Some(&self.expr)]
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         Some(&self.expr)
     }
 
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index bf9dfbd1b..af57c9ef9 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -21,15 +21,19 @@
 
 //! The Union operator combines multiple inputs with the same schema
 
+use std::pin::Pin;
+use std::task::{Context, Poll};
 use std::{any::Any, sync::Arc};
 
+use arrow::error::Result as ArrowResult;
 use arrow::{
     datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
-use futures::StreamExt;
+use futures::{Stream, StreamExt};
 use itertools::Itertools;
 use log::debug;
+use log::warn;
 
 use super::{
     expressions::PhysicalSortExpr,
@@ -42,6 +46,8 @@ use crate::{
     error::Result,
     physical_plan::{expressions, metrics::BaselineMetrics},
 };
+use datafusion_physical_expr::sort_expr_list_eq_strict_order;
+use tokio::macros::support::thread_rng_n;
 
 /// UNION ALL execution plan
 #[derive(Debug)]
@@ -52,6 +58,8 @@ pub struct UnionExec {
     metrics: ExecutionPlanMetricsSet,
     /// Schema of Union
     schema: SchemaRef,
+    /// Partition aware Union
+    partition_aware: bool,
 }
 
 impl UnionExec {
@@ -78,10 +86,24 @@ impl UnionExec {
             inputs[0].schema().metadata().clone(),
         ));
 
+        // If all the input partitions have the same Hash partition spec with the first_input_partition
+        // The UnionExec is partition aware.
+        //
+        // It might be too strict here in the case that the input partition specs are compatible but not exactly the same.
+        // For example one input partition has the partition spec Hash('a','b','c') and
+        // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c').
+        let first_input_partition = inputs[0].output_partitioning();
+        let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _))
+            && inputs
+                .iter()
+                .map(|plan| plan.output_partitioning())
+                .all(|partition| partition == first_input_partition);
+
         UnionExec {
             inputs,
             metrics: ExecutionPlanMetricsSet::new(),
             schema,
+            partition_aware,
         }
     }
 
@@ -107,23 +129,46 @@ impl ExecutionPlan for UnionExec {
 
     /// Output of the union is the combination of all output partitions of the inputs
     fn output_partitioning(&self) -> Partitioning {
-        // Sums all the output partitions
-        let num_partitions = self
-            .inputs
-            .iter()
-            .map(|plan| plan.output_partitioning().partition_count())
-            .sum();
-        // TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`)
-        // https://issues.apache.org/jira/browse/ARROW-11991
-        Partitioning::UnknownPartitioning(num_partitions)
+        if self.partition_aware {
+            self.inputs[0].output_partitioning()
+        } else {
+            // Output the combination of all output partitions of the inputs if the Union is not partition aware
+            let num_partitions = self
+                .inputs
+                .iter()
+                .map(|plan| plan.output_partitioning().partition_count())
+                .sum();
+
+            Partitioning::UnknownPartitioning(num_partitions)
+        }
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn relies_on_input_order(&self) -> bool {
-        false
+        let first_input_ordering = self.inputs[0].output_ordering();
+        // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering
+        // Return the first_input_ordering as the output_ordering
+        //
+        // It might be too strict here in the case that the input ordering are compatible but not exactly the same.
+        // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering
+        // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a').
+        if !self.partition_aware
+            && first_input_ordering.is_some()
+            && self
+                .inputs
+                .iter()
+                .map(|plan| plan.output_ordering())
+                .all(|ordering| {
+                    ordering.is_some()
+                        && sort_expr_list_eq_strict_order(
+                            ordering.unwrap(),
+                            first_input_ordering.unwrap(),
+                        )
+                })
+        {
+            first_input_ordering
+        } else {
+            None
+        }
     }
 
     fn with_new_children(
@@ -145,19 +190,38 @@ impl ExecutionPlan for UnionExec {
         let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer(); // record on drop
 
-        // find partition to execute
-        for input in self.inputs.iter() {
-            // Calculate whether partition belongs to the current partition
-            if partition < input.output_partitioning().partition_count() {
-                let stream = input.execute(partition, context)?;
-                debug!("Found a Union partition to execute");
+        if self.partition_aware {
+            let mut input_stream_vec = vec![];
+            for input in self.inputs.iter() {
+                if partition < input.output_partitioning().partition_count() {
+                    input_stream_vec.push(input.execute(partition, context.clone())?);
+                } else {
+                    // Do not find a partition to execute
+                    break;
+                }
+            }
+            if input_stream_vec.len() == self.inputs.len() {
+                let stream = Box::pin(CombinedRecordBatchStream::new(
+                    self.schema(),
+                    input_stream_vec,
+                ));
                 return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
-            } else {
-                partition -= input.output_partitioning().partition_count();
+            }
+        } else {
+            // find partition to execute
+            for input in self.inputs.iter() {
+                // Calculate whether partition belongs to the current partition
+                if partition < input.output_partitioning().partition_count() {
+                    let stream = input.execute(partition, context)?;
+                    debug!("Found a Union partition to execute");
+                    return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+                } else {
+                    partition -= input.output_partitioning().partition_count();
+                }
             }
         }
 
-        debug!("Error in Union: Partition {} not found", partition);
+        warn!("Error in Union: Partition {} not found", partition);
 
         Err(crate::error::DataFusionError::Execution(format!(
             "Partition {} not found in Union",
@@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec {
     }
 }
 
+/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
+pub struct CombinedRecordBatchStream {
+    /// Schema wrapped by Arc
+    schema: SchemaRef,
+    /// Stream entries
+    entries: Vec<SendableRecordBatchStream>,
+}
+
+impl CombinedRecordBatchStream {
+    /// Create an CombinedRecordBatchStream
+    pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self {
+        Self { schema, entries }
+    }
+}
+
+impl RecordBatchStream for CombinedRecordBatchStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Stream for CombinedRecordBatchStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        use Poll::*;
+
+        let start = thread_rng_n(self.entries.len() as u32) as usize;
+        let mut idx = start;
+
+        for _ in 0..self.entries.len() {
+            let stream = self.entries.get_mut(idx).unwrap();
+
+            match Pin::new(stream).poll_next(cx) {
+                Ready(Some(val)) => return Ready(Some(val)),
+                Ready(None) => {
+                    // Remove the entry
+                    self.entries.swap_remove(idx);
+
+                    // Check if this was the last entry, if so the cursor needs
+                    // to wrap
+                    if idx == self.entries.len() {
+                        idx = 0;
+                    } else if idx < start && start <= self.entries.len() {
+                        // The stream being swapped into the current index has
+                        // already been polled, so skip it.
+                        idx = idx.wrapping_add(1) % self.entries.len();
+                    }
+                }
+                Pending => {
+                    idx = idx.wrapping_add(1) % self.entries.len();
+                }
+            }
+        }
+
+        // If the map is empty, then the stream is complete.
+        if self.entries.is_empty() {
+            Ready(None)
+        } else {
+            Pending
+        }
+    }
+}
+
 /// Stream wrapper that records `BaselineMetrics` for a particular
 /// partition
 struct ObservedStream {
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index 897936814..6ab4f7b82 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -22,8 +22,8 @@ use super::{common, SendableRecordBatchStream, Statistics};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::physical_plan::{
-    memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan,
-    Partitioning, PhysicalExpr,
+    memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning,
+    PhysicalExpr,
 };
 use crate::scalar::ScalarValue;
 use arrow::array::new_null_array;
@@ -108,11 +108,6 @@ impl ExecutionPlan for ValuesExec {
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![]
     }
-
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
-    }
-
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
@@ -122,10 +117,6 @@ impl ExecutionPlan for ValuesExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index bece2a50c..a488c6ffa 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -217,6 +217,8 @@ mod tests {
             ],
             input,
             schema.clone(),
+            vec![],
+            None,
         )?);
 
         let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
@@ -262,6 +264,8 @@ mod tests {
             )?],
             blocking_exec,
             schema,
+            vec![],
+            None,
         )?);
 
         let fut = collect(window_agg_exec, task_ctx);
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index e9eac35a3..76ad0afb1 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,8 +24,9 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::{
-    common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
-    Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+    common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+    ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
+    SendableRecordBatchStream, Statistics, WindowExpr,
 };
 use arrow::{
     array::ArrayRef,
@@ -35,6 +36,7 @@ use arrow::{
 };
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
+use log::warn;
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -51,6 +53,10 @@ pub struct WindowAggExec {
     schema: SchemaRef,
     /// Schema before the window
     input_schema: SchemaRef,
+    /// Partition Keys
+    pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+    /// Sort Keys
+    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -61,6 +67,8 @@ impl WindowAggExec {
         window_expr: Vec<Arc<dyn WindowExpr>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
+        partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+        sort_keys: Option<Vec<PhysicalSortExpr>>,
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
@@ -69,6 +77,8 @@ impl WindowAggExec {
             window_expr,
             schema,
             input_schema,
+            partition_keys,
+            sort_keys,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");
+            vec![Distribution::SinglePartition]
         } else {
-            Distribution::UnspecifiedDistribution
+            //TODO support PartitionCollections if there is no common partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]
         }
     }
 
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -143,6 +156,8 @@ impl ExecutionPlan for WindowAggExec {
             self.window_expr.clone(),
             children[0].clone(),
             self.input_schema.clone(),
+            self.partition_keys.clone(),
+            self.sort_keys.clone(),
         )?))
     }
 
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
index 20e7c6e79..8ecece85e 100644
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -235,8 +235,8 @@ impl ExecutionPlan for ProxyExecutionPlan {
         self.inner.output_ordering()
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        self.inner.required_child_distribution()
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        self.inner.required_input_distribution()
     }
 
     fn relies_on_input_order(&self) -> bool {
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 7e9ece36b..3fb810e3a 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -441,12 +441,8 @@ impl ExecutionPlan for TopKExec {
         None
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition]
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
new file mode 100644
index 000000000..411a492a5
--- /dev/null
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expressions::Column;
+
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+/// Equivalence Properties is a vec of EquivalentClass.
+#[derive(Debug, Default, Clone)]
+pub struct EquivalenceProperties {
+    classes: Vec<EquivalentClass>,
+}
+
+impl EquivalenceProperties {
+    pub fn new() -> Self {
+        EquivalenceProperties { classes: vec![] }
+    }
+
+    pub fn classes(&self) -> &[EquivalentClass] {
+        &self.classes
+    }
+
+    pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I) {
+        self.classes.extend(iter)
+    }
+
+    /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
+    /// equality predicates in Join or Filter
+    pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) {
+        let mut idx1: Option<usize> = None;
+        let mut idx2: Option<usize> = None;
+        for (idx, class) in self.classes.iter_mut().enumerate() {
+            let contains_first = class.contains(new_conditions.0);
+            let contains_second = class.contains(new_conditions.1);
+            match (contains_first, contains_second) {
+                (true, false) => {
+                    class.insert(new_conditions.1.clone());
+                    idx1 = Some(idx);
+                }
+                (false, true) => {
+                    class.insert(new_conditions.0.clone());
+                    idx2 = Some(idx);
+                }
+                (true, true) => {
+                    idx1 = Some(idx);
+                    idx2 = Some(idx);
+                    break;
+                }
+                (false, false) => {}
+            }
+        }
+
+        match (idx1, idx2) {
+            (Some(idx_1), Some(idx_2)) if idx_1 != idx_2 => {
+                // need to merge the two existing EquivalentClasses
+                let second_eq_class = self.classes.get(idx_2).unwrap().clone();
+                let first_eq_class = self.classes.get_mut(idx_1).unwrap();
+                for prop in second_eq_class.iter() {
+                    if !first_eq_class.contains(prop) {
+                        first_eq_class.insert(prop.clone());
+                    }
+                }
+                self.classes.remove(idx_2);
+            }
+            (None, None) => {
+                // adding new pairs
+                self.classes.push(EquivalentClass::new(
+                    new_conditions.0.clone(),
+                    vec![new_conditions.1.clone()],
+                ));
+            }
+            _ => {}
+        }
+    }
+
+    pub fn merge_properties_with_alias(
+        &mut self,
+        alias_map: &HashMap<Column, Vec<Column>>,
+    ) {
+        for (column, columns) in alias_map {
+            let mut find_match = false;
+            for class in self.classes.iter_mut() {
+                if class.contains(column) {
+                    for col in columns {
+                        class.insert(col.clone());
+                    }
+                    find_match = true;
+                    break;
+                }
+            }
+            if !find_match {
+                self.classes
+                    .push(EquivalentClass::new(column.clone(), columns.clone()));
+            }
+        }
+    }
+
+    pub fn truncate_properties_not_in_schema(&mut self, schema: &SchemaRef) {
+        for class in self.classes.iter_mut() {
+            let mut columns_to_remove = vec![];
+            for column in class.iter() {
+                if let Ok(idx) = schema.index_of(column.name()) {
+                    if idx != column.index() {
+                        columns_to_remove.push(column.clone());
+                    }
+                } else {
+                    columns_to_remove.push(column.clone());
+                }
+            }
+            for column in columns_to_remove {
+                class.remove(&column);
+            }
+        }
+        self.classes.retain(|props| props.len() > 1);
+    }
+}
+
+/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
+/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.
+#[derive(Debug, Clone)]
+pub struct EquivalentClass {
+    /// First element in the EquivalentClass
+    head: Column,
+    /// Other equal columns
+    others: HashSet<Column>,
+}
+
+impl EquivalentClass {
+    pub fn new(head: Column, others: Vec<Column>) -> Self {
+        EquivalentClass {
+            head,
+            others: HashSet::from_iter(others),
+        }
+    }
+
+    pub fn head(&self) -> &Column {
+        &self.head
+    }
+
+    pub fn others(&self) -> &HashSet<Column> {
+        &self.others
+    }
+
+    pub fn contains(&self, col: &Column) -> bool {
+        self.head == *col || self.others.contains(col)
+    }
+
+    pub fn insert(&mut self, col: Column) -> bool {
+        self.others.insert(col)
+    }
+
+    pub fn remove(&mut self, col: &Column) -> bool {
+        let removed = self.others.remove(col);
+        if !removed && *col == self.head {
+            let one_col = self.others.iter().next().cloned();
+            if let Some(col) = one_col {
+                let removed = self.others.remove(&col);
+                self.head = col;
+                removed
+            } else {
+                false
+            }
+        } else {
+            true
+        }
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &'_ Column> {
+        std::iter::once(&self.head).chain(self.others.iter())
+    }
+
+    pub fn len(&self) -> usize {
+        self.others.len() + 1
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::expressions::Column;
+    use datafusion_common::Result;
+
+    #[test]
+    fn add_equal_conditions_test() -> Result<()> {
+        let mut eq_properties = EquivalenceProperties::new();
+        let new_condition = (&Column::new("a", 0), &Column::new("b", 1));
+        eq_properties.add_equal_conditions(new_condition);
+        assert_eq!(eq_properties.classes().len(), 1);
+
+        let new_condition = (&Column::new("b", 1), &Column::new("a", 0));
+        eq_properties.add_equal_conditions(new_condition);
+        assert_eq!(eq_properties.classes().len(), 1);
+        assert_eq!(eq_properties.classes()[0].len(), 2);
+
+        let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
+        eq_properties.add_equal_conditions(new_condition);
+        assert_eq!(eq_properties.classes().len(), 1);
+        assert_eq!(eq_properties.classes()[0].len(), 3);
+
+        let new_condition = (&Column::new("x", 99), &Column::new("y", 100));
+        eq_properties.add_equal_conditions(new_condition);
+        assert_eq!(eq_properties.classes().len(), 2);
+
+        let new_condition = (&Column::new("x", 99), &Column::new("a", 0));
+        eq_properties.add_equal_conditions(new_condition);
+        assert_eq!(eq_properties.classes().len(), 1);
+        assert_eq!(eq_properties.classes()[0].len(), 5);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merge_equivalence_properties_with_alias_test() -> Result<()> {
+        let mut eq_properties = EquivalenceProperties::new();
+        let mut alias_map = HashMap::new();
+        alias_map.insert(
+            Column::new("a", 0),
+            vec![Column::new("a1", 1), Column::new("a2", 2)],
+        );
+
+        eq_properties.merge_properties_with_alias(&alias_map);
+        assert_eq!(eq_properties.classes().len(), 1);
+        assert_eq!(eq_properties.classes()[0].len(), 3);
+
+        let mut alias_map = HashMap::new();
+        alias_map.insert(
+            Column::new("a", 0),
+            vec![Column::new("a3", 1), Column::new("a4", 2)],
+        );
+        eq_properties.merge_properties_with_alias(&alias_map);
+        assert_eq!(eq_properties.classes().len(), 1);
+        assert_eq!(eq_properties.classes()[0].len(), 5);
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs
index 450919aa3..776f4dc0a 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -118,6 +118,76 @@ impl PartialEq<dyn Any> for Column {
     }
 }
 
+/// Represents the unknown column without index
+#[derive(Debug, Hash, PartialEq, Eq, Clone)]
+pub struct UnKnownColumn {
+    name: String,
+}
+
+impl UnKnownColumn {
+    /// Create a new unknown column expression
+    pub fn new(name: &str) -> Self {
+        Self {
+            name: name.to_owned(),
+        }
+    }
+
+    /// Get the column name
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl std::fmt::Display for UnKnownColumn {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.name)
+    }
+}
+
+impl PhysicalExpr for UnKnownColumn {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    /// Get the data type of this expression, given the schema of the input
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Null)
+    }
+
+    /// Decide whehter this expression is nullable, given the schema of the input
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(true)
+    }
+
+    /// Evaluate the expression
+    fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
+        Err(DataFusionError::Plan(
+            "UnKnownColumn::evaluate() should not be called".to_owned(),
+        ))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(self)
+    }
+}
+
+impl PartialEq<dyn Any> for UnKnownColumn {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| self == x)
+            .unwrap_or(false)
+    }
+}
+
 #[derive(Debug, Clone)]
 struct ColumnExprStats {
     index: usize,
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index d27737dd9..26bb3ca1e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -72,7 +72,7 @@ pub use case::{case, CaseExpr};
 pub use cast::{
     cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
 };
-pub use column::{col, Column};
+pub use column::{col, Column, UnKnownColumn};
 pub use datetime::DateTimeIntervalExpr;
 pub use get_indexed_field::GetIndexedFieldExpr;
 pub use in_list::{in_list, InListExpr};
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index d2b899dca..026329e84 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -21,6 +21,7 @@ pub mod conditional_expressions;
 #[cfg(feature = "crypto_expressions")]
 pub mod crypto_expressions;
 pub mod datetime_expressions;
+pub mod equivalence;
 pub mod execution_props;
 pub mod expressions;
 pub mod functions;
@@ -46,7 +47,15 @@ pub mod window;
 // reexport this to maintain compatibility with anything that used from_slice previously
 pub use aggregate::AggregateExpr;
 pub use datafusion_common::from_slice;
+pub use equivalence::EquivalenceProperties;
+pub use equivalence::EquivalentClass;
 pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
 pub use sort_expr::PhysicalSortExpr;
+pub use utils::{
+    expr_list_eq_any_order, expr_list_eq_strict_order,
+    normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema,
+    normalize_sort_expr_with_equivalence_properties, sort_expr_list_eq_strict_order,
+    split_conjunction,
+};
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 7ee947c0a..78ce9c931 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,9 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::equivalence::EquivalentClass;
+use crate::expressions::BinaryExpr;
+use crate::expressions::Column;
+use crate::expressions::UnKnownColumn;
+use crate::rewrite::TreeNodeRewritable;
 use crate::PhysicalExpr;
 use crate::PhysicalSortExpr;
+use datafusion_expr::Operator;
 
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
 use std::sync::Arc;
 
 /// Compare the two expr lists are equal no matter the order.
@@ -65,6 +74,117 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+pub fn split_conjunction(
+    predicate: &Arc<dyn PhysicalExpr>,
+) -> Vec<&Arc<dyn PhysicalExpr>> {
+    split_conjunction_impl(predicate, vec![])
+}
+
+fn split_conjunction_impl<'a>(
+    predicate: &'a Arc<dyn PhysicalExpr>,
+    mut exprs: Vec<&'a Arc<dyn PhysicalExpr>>,
+) -> Vec<&'a Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let exprs = split_conjunction_impl(binary.left(), exprs);
+                split_conjunction_impl(binary.right(), exprs)
+            }
+            _ => {
+                exprs.push(predicate);
+                exprs
+            }
+        },
+        None => {
+            exprs.push(predicate);
+            exprs
+        }
+    }
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalentClass],
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| match expr.as_any().downcast_ref::<Column>() {
+            Some(column) => {
+                let mut normalized: Option<Arc<dyn PhysicalExpr>> = None;
+                for class in eq_properties {
+                    if class.contains(column) {
+                        normalized = Some(Arc::new(class.head().clone()));
+                        break;
+                    }
+                }
+                normalized
+            }
+            None => None,
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_sort_expr_with_equivalence_properties(
+    sort_expr: PhysicalSortExpr,
+    eq_properties: &[EquivalentClass],
+) -> PhysicalSortExpr {
+    let normalized_expr =
+        normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties);
+
+    if sort_expr.expr.ne(&normalized_expr) {
+        PhysicalSortExpr {
+            expr: normalized_expr,
+            options: sort_expr.options,
+        }
+    } else {
+        sort_expr
+    }
+}
+
 #[cfg(test)]
 mod tests {
 
@@ -77,7 +197,7 @@ mod tests {
     use std::sync::Arc;
 
     #[test]
-    fn expr_list_eq_any_order_test() -> Result<()> {
+    fn expr_list_eq_test() -> Result<()> {
         let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
             Arc::new(Column::new("a", 0)),
             Arc::new(Column::new("a", 0)),
@@ -91,6 +211,15 @@ mod tests {
         assert!(!expr_list_eq_any_order(list1.as_slice(), list2.as_slice()));
         assert!(!expr_list_eq_any_order(list2.as_slice(), list1.as_slice()));
 
+        assert!(!expr_list_eq_strict_order(
+            list1.as_slice(),
+            list2.as_slice()
+        ));
+        assert!(!expr_list_eq_strict_order(
+            list2.as_slice(),
+            list1.as_slice()
+        ));
+
         let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
             Arc::new(Column::new("a", 0)),
             Arc::new(Column::new("b", 1)),
@@ -110,6 +239,17 @@ mod tests {
         assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice()));
         assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice()));
 
+        assert!(!expr_list_eq_strict_order(
+            list3.as_slice(),
+            list4.as_slice()
+        ));
+        assert!(!expr_list_eq_strict_order(
+            list4.as_slice(),
+            list3.as_slice()
+        ));
+        assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice()));
+        assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice()));
+
         Ok(())
     }