You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/04 10:21:13 UTC

[arrow-datafusion] branch main updated: Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements (#5754)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d6c2233e21 Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements (#5754)
d6c2233e21 is described below

commit d6c2233e21b6aa7cfb157e5f36c75934f077c167
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Tue Apr 4 13:21:06 2023 +0300

    Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements (#5754)
    
    * Increase optimizer performance
    
    * Config added.
    
    * Simplifications and comment improvements
    
    * More simplifications
    
    * Revamping tests for unbounded-unbounded cases.
    
    * Review code
    
    * Move SHJ suitability from PipelineFixer to PipelineChecker, further SHJ code simplifications
    
    * Added logging on tests and ensure timeout
    
    * Robust fifo writing in case of slow executions
    
    * Update fifo.rs
    
    * Update fifo.rs
    
    * Update fifo.rs
    
    * Update fifo.rs
    
    * Get rid of locks
    
    * Try exact one batch size
    
    * Update fifo.rs
    
    * Update fifo.rs
    
    * Update fifo.rs
    
    * Ignore FIFO test
    
    * Update config.rs
    
    * Config update
    
    * Update config.rs
    
    * Update configs.md
    
    * Update config
    
    * Update symmetric_hash_join.rs
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/common/src/config.rs                    |   9 +
 datafusion/core/src/execution/context.rs           |   3 -
 .../src/physical_optimizer/pipeline_checker.rs     | 105 ++-
 .../core/src/physical_optimizer/pipeline_fixer.rs  | 119 +--
 .../src/physical_plan/joins/hash_join_utils.rs     |  62 +-
 datafusion/core/src/physical_plan/joins/mod.rs     |   1 -
 .../src/physical_plan/joins/symmetric_hash_join.rs | 830 +++++++++++++--------
 datafusion/core/tests/fifo.rs                      | 400 +++++-----
 .../test_files/information_schema.slt              |   1 +
 datafusion/execution/src/config.rs                 |   6 +
 .../physical-expr/src/intervals/cp_solver.rs       |  22 +-
 datafusion/physical-expr/src/intervals/mod.rs      |   2 +-
 docs/source/user-guide/configs.md                  |  75 +-
 13 files changed, 916 insertions(+), 719 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index fb4c3422cb..55cdc36d20 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -280,6 +280,15 @@ config_namespace! {
         /// using the provided `target_partitions` level
         pub repartition_joins: bool, default = true
 
+        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
+        /// its inputs do not have any ordering or filtering If the flag is not enabled,
+        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
+        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
+        /// RightAnti, and RightSemi - being produced only at the end of the execution.
+        /// This is not typical in stream processing. Additionally, without proper design for
+        /// long runner execution, all types of joins may encounter out-of-memory errors.
+        pub allow_symmetric_joins_without_pruning: bool, default = true
+
         /// When set to true, file groups will be repartitioned to achieve maximum parallelism.
         /// Currently supported only for Parquet format in which case
         /// multiple row groups from the same file may be read concurrently. If false then each
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index c9ad030991..9ff7c66004 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1301,9 +1301,6 @@ impl SessionState {
             // repartitioning and local sorting steps to meet distribution and ordering requirements.
             // Therefore, it should run before EnforceDistribution and EnforceSorting.
             Arc::new(JoinSelection::new()),
-            // Enforce sort before PipelineFixer
-            Arc::new(EnforceDistribution::new()),
-            Arc::new(EnforceSorting::new()),
             // If the query is processing infinite inputs, the PipelineFixer rule applies the
             // necessary transformations to make the query runnable (if it is not already runnable).
             // If the query can not be made runnable, the rule emits an error with a diagnostic message.
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 03e47e6e94..f875329d1a 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -22,8 +22,12 @@
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::joins::SymmetricHashJoinExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_common::config::OptimizerOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::DataFusionError;
+use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
 use std::sync::Arc;
 
 /// The PipelineChecker rule rejects non-runnable query plans that use
@@ -42,10 +46,11 @@ impl PhysicalOptimizerRule for PipelineChecker {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let pipeline = PipelineStatePropagator::new(plan);
-        let state = pipeline.transform_up(&check_finiteness_requirements)?;
+        let state = pipeline
+            .transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?;
         Ok(state.plan)
     }
 
@@ -128,16 +133,39 @@ impl TreeNode for PipelineStatePropagator {
 /// This function propagates finiteness information and rejects any plan with
 /// pipeline-breaking operators acting on infinite inputs.
 pub fn check_finiteness_requirements(
-    input: PipelineStatePropagator,
+    mut input: PipelineStatePropagator,
+    optimizer_options: &OptimizerOptions,
 ) -> Result<Transformed<PipelineStatePropagator>> {
-    let plan = input.plan;
-    let children = input.children_unbounded;
-    plan.unbounded_output(&children).map(|value| {
-        Transformed::Yes(PipelineStatePropagator {
-            plan,
-            unbounded: value,
-            children_unbounded: children,
+    if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
+        if !(optimizer_options.allow_symmetric_joins_without_pruning
+            || (exec.check_if_order_information_available()? && is_prunable(exec)))
+        {
+            const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
+                               the 'allow_symmetric_joins_without_pruning' configuration flag";
+            return Err(DataFusionError::Plan(MSG.to_owned()));
+        }
+    }
+    input
+        .plan
+        .unbounded_output(&input.children_unbounded)
+        .map(|value| {
+            input.unbounded = value;
+            Transformed::Yes(input)
         })
+}
+
+/// This function returns whether a given symmetric hash join is amenable to
+/// data pruning. For this to be possible, it needs to have a filter where
+/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
+/// interval calculations.
+fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
+    join.filter().map_or(false, |filter| {
+        check_support(filter.expression())
+            && filter
+                .schema()
+                .fields()
+                .iter()
+                .all(|f| is_datatype_supported(f.data_type()))
     })
 }
 
@@ -154,27 +182,19 @@ mod sql_tests {
             source_types: (SourceType::Unbounded, SourceType::Bounded),
             expect_fail: false,
         };
+
         let test2 = BinaryTestCase {
-            source_types: (SourceType::Unbounded, SourceType::Unbounded),
-            expect_fail: true,
-        };
-        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
             expect_fail: true,
         };
-        let test4 = BinaryTestCase {
+        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
             expect_fail: false,
         };
         let case = QueryCase {
             sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1"
                 .to_string(),
-            cases: vec![
-                Arc::new(test1),
-                Arc::new(test2),
-                Arc::new(test3),
-                Arc::new(test4),
-            ],
+            cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
             error_operator: "Join Error".to_string(),
         };
 
@@ -189,26 +209,17 @@ mod sql_tests {
             expect_fail: true,
         };
         let test2 = BinaryTestCase {
-            source_types: (SourceType::Unbounded, SourceType::Unbounded),
-            expect_fail: true,
-        };
-        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
             expect_fail: false,
         };
-        let test4 = BinaryTestCase {
+        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
             expect_fail: false,
         };
         let case = QueryCase {
             sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1"
                 .to_string(),
-            cases: vec![
-                Arc::new(test1),
-                Arc::new(test2),
-                Arc::new(test3),
-                Arc::new(test4),
-            ],
+            cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
             error_operator: "Join Error".to_string(),
         };
 
@@ -223,26 +234,17 @@ mod sql_tests {
             expect_fail: false,
         };
         let test2 = BinaryTestCase {
-            source_types: (SourceType::Unbounded, SourceType::Unbounded),
-            expect_fail: true,
-        };
-        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
             expect_fail: false,
         };
-        let test4 = BinaryTestCase {
+        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
             expect_fail: false,
         };
         let case = QueryCase {
             sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1"
                 .to_string(),
-            cases: vec![
-                Arc::new(test1),
-                Arc::new(test2),
-                Arc::new(test3),
-                Arc::new(test4),
-            ],
+            cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
             error_operator: "Join Error".to_string(),
         };
 
@@ -257,26 +259,17 @@ mod sql_tests {
             expect_fail: true,
         };
         let test2 = BinaryTestCase {
-            source_types: (SourceType::Unbounded, SourceType::Unbounded),
-            expect_fail: true,
-        };
-        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
             expect_fail: true,
         };
-        let test4 = BinaryTestCase {
+        let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
             expect_fail: false,
         };
         let case = QueryCase {
             sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1"
                 .to_string(),
-            cases: vec![
-                Arc::new(test1),
-                Arc::new(test2),
-                Arc::new(test3),
-                Arc::new(test4),
-            ],
+            cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
             error_operator: "Join Error".to_string(),
         };
 
@@ -321,7 +314,7 @@ mod sql_tests {
                   FROM test
                   LIMIT 5".to_string(),
             cases: vec![Arc::new(test1), Arc::new(test2)],
-            error_operator: "Sort Error".to_string()
+            error_operator: "Window Error".to_string()
         };
 
         case.run().await?;
@@ -344,7 +337,7 @@ mod sql_tests {
                         SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
                   FROM test".to_string(),
             cases: vec![Arc::new(test1), Arc::new(test2)],
-            error_operator: "Sort Error".to_string()
+            error_operator: "Window Error".to_string()
         };
         case.run().await?;
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index 478f840599..8ca36c9bea 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -25,26 +25,17 @@
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::join_selection::swap_hash_join;
-use crate::physical_optimizer::pipeline_checker::{
-    check_finiteness_requirements, PipelineStatePropagator,
-};
+use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
 use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::utils::JoinSide;
-use crate::physical_plan::joins::{
-    convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
-    SymmetricHashJoinExec,
-};
+use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SymmetricHashJoinExec};
 use crate::physical_plan::ExecutionPlan;
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::DataFusionError;
 use datafusion_expr::logical_plan::JoinType;
-use datafusion_physical_expr::expressions::{BinaryExpr, CastExpr, Column, Literal};
-use datafusion_physical_expr::intervals::{is_datatype_supported, is_operator_supported};
-use datafusion_physical_expr::PhysicalExpr;
 
 use std::sync::Arc;
 
-/// The [PipelineFixer] rule tries to modify a given plan so that it can
+/// The [`PipelineFixer`] rule tries to modify a given plan so that it can
 /// accommodate its infinite sources, if there are any. If this is not
 /// possible, the rule emits a diagnostic error message.
 #[derive(Default)]
@@ -56,10 +47,10 @@ impl PipelineFixer {
         Self {}
     }
 }
-/// [PipelineFixer] subrules are functions of this type. Such functions take a
-/// single [PipelineStatePropagator] argument, which stores state variables
-/// indicating the unboundedness status of the current [ExecutionPlan] as
-/// the [PipelineFixer] rule traverses the entire plan tree.
+/// [`PipelineFixer`] subrules are functions of this type. Such functions take a
+/// single [`PipelineStatePropagator`] argument, which stores state variables
+/// indicating the unboundedness status of the current [`ExecutionPlan`] as
+/// the `PipelineFixer` rule traverses the entire plan tree.
 type PipelineFixerSubrule =
     dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
 
@@ -92,62 +83,6 @@ impl PhysicalOptimizerRule for PipelineFixer {
     }
 }
 
-/// Indicates whether interval arithmetic is supported for the given expression.
-/// Currently, we do not support all [PhysicalExpr]s for interval calculations.
-/// We do not support every type of [Operator]s either. Over time, this check
-/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported.
-/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported.
-fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
-    let expr_any = expr.as_any();
-    let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
-    {
-        is_operator_supported(binary_expr.op())
-    } else {
-        expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
-    };
-    expr_supported && expr.children().iter().all(check_support)
-}
-
-/// This function returns whether a given hash join is replaceable by a
-/// symmetric hash join. Basically, the requirement is that involved
-/// [PhysicalExpr]s, [Operator]s and data types need to be supported,
-/// and order information must cover every column in the filter expression.
-fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) -> Result<bool> {
-    if let Some(filter) = hash_join.filter() {
-        let left = hash_join.left();
-        if let Some(left_ordering) = left.output_ordering() {
-            let right = hash_join.right();
-            if let Some(right_ordering) = right.output_ordering() {
-                let expr_supported = check_support(filter.expression());
-                let left_convertible = convert_sort_expr_with_filter_schema(
-                    &JoinSide::Left,
-                    filter,
-                    &left.schema(),
-                    &left_ordering[0],
-                )?
-                .is_some();
-                let right_convertible = convert_sort_expr_with_filter_schema(
-                    &JoinSide::Right,
-                    filter,
-                    &right.schema(),
-                    &right_ordering[0],
-                )?
-                .is_some();
-                let fields_supported = filter
-                    .schema()
-                    .fields()
-                    .iter()
-                    .all(|f| is_datatype_supported(f.data_type()));
-                return Ok(expr_supported
-                    && fields_supported
-                    && left_convertible
-                    && right_convertible);
-            }
-        }
-    }
-    Ok(false)
-}
-
 /// This subrule checks if one can replace a hash join with a symmetric hash
 /// join so that the pipeline does not break due to the join operation in
 /// question. If possible, it makes this replacement; otherwise, it has no
@@ -160,23 +95,19 @@ fn hash_join_convert_symmetric_subrule(
         let ub_flags = input.children_unbounded;
         let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
         let new_plan = if left_unbounded && right_unbounded {
-            match is_suitable_for_symmetric_hash_join(hash_join) {
-                Ok(true) => SymmetricHashJoinExec::try_new(
-                    hash_join.left().clone(),
-                    hash_join.right().clone(),
-                    hash_join
-                        .on()
-                        .iter()
-                        .map(|(l, r)| (l.clone(), r.clone()))
-                        .collect(),
-                    hash_join.filter().unwrap().clone(),
-                    hash_join.join_type(),
-                    hash_join.null_equals_null(),
-                )
-                .map(|e| Arc::new(e) as _),
-                Ok(false) => Ok(plan),
-                Err(e) => return Some(Err(e)),
-            }
+            SymmetricHashJoinExec::try_new(
+                hash_join.left().clone(),
+                hash_join.right().clone(),
+                hash_join
+                    .on()
+                    .iter()
+                    .map(|(l, r)| (l.clone(), r.clone()))
+                    .collect(),
+                hash_join.filter().cloned(),
+                hash_join.join_type(),
+                hash_join.null_equals_null(),
+            )
+            .map(|e| Arc::new(e) as _)
         } else {
             Ok(plan)
         };
@@ -298,14 +229,20 @@ fn apply_subrules_and_check_finiteness_requirements(
             input = value;
         }
     }
-    check_finiteness_requirements(input)
+    input
+        .plan
+        .unbounded_output(&input.children_unbounded)
+        .map(|value| {
+            input.unbounded = value;
+            Transformed::Yes(input)
+        })
 }
 
 #[cfg(test)]
 mod util_tests {
-    use crate::physical_optimizer::pipeline_fixer::check_support;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
+    use datafusion_physical_expr::intervals::check_support;
     use datafusion_physical_expr::PhysicalExpr;
     use std::sync::Arc;
 
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index a3e8946ea6..ffba78c1d3 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -25,7 +25,6 @@ use std::usize;
 use arrow::datatypes::SchemaRef;
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::DataFusionError;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::intervals::Interval;
 use datafusion_physical_expr::utils::collect_columns;
@@ -77,19 +76,21 @@ pub fn map_origin_col_to_filter_col(
     Ok(col_to_col_map)
 }
 
-/// This function analyzes [PhysicalSortExpr] graphs with respect to monotonicity
+/// This function analyzes [`PhysicalSortExpr`] graphs with respect to monotonicity
 /// (sorting) properties. This is necessary since monotonically increasing and/or
 /// decreasing expressions are required when using join filter expressions for
 /// data pruning purposes.
 ///
 /// The method works as follows:
-/// 1. Maps the original columns to the filter columns using the `map_origin_col_to_filter_col` function.
-/// 2. Collects all columns in the sort expression using the `PhysicalExprColumnCollector` visitor.
-/// 3. Checks if all columns are included in the `column_mapping_information` map.
-/// 4. If all columns are included, the sort expression is converted into a filter expression using the `transform_up` and `convert_filter_columns` functions.
-/// 5. Searches the converted filter expression in the filter expression using the `check_filter_expr_contains_sort_information`.
-/// 6. If an exact match is encountered, returns the converted filter expression as `Some(Arc<dyn PhysicalExpr>)`.
-/// 7. If all columns are not included or the exact match is not encountered, returns `None`.
+/// 1. Maps the original columns to the filter columns using the [`map_origin_col_to_filter_col`] function.
+/// 2. Collects all columns in the sort expression using the [`collect_columns`] function.
+/// 3. Checks if all columns are included in the map we obtain in the first step.
+/// 4. If all columns are included, the sort expression is converted into a filter expression using
+///    the [`convert_filter_columns`] function.
+/// 5. Searches for the converted filter expression in the filter expression using the
+///    [`check_filter_expr_contains_sort_information`] function.
+/// 6. If an exact match is found, returns the converted filter expression as [`Some(Arc<dyn PhysicalExpr>)`].
+/// 7. If all columns are not included or an exact match is not found, returns [`None`].
 ///
 /// Examples:
 /// Consider the filter expression "a + b > c + 10 AND a + b < c + 100".
@@ -135,28 +136,21 @@ pub fn convert_sort_expr_with_filter_schema(
 
 /// This function is used to build the filter expression based on the sort order of input columns.
 ///
-/// It first calls the [convert_sort_expr_with_filter_schema] method to determine if the sort
-/// order of columns can be used in the filter expression. If it returns a [Some] value, the
-/// method wraps the result in a [SortedFilterExpr] instance with the original sort expression and
+/// It first calls the [`convert_sort_expr_with_filter_schema`] method to determine if the sort
+/// order of columns can be used in the filter expression. If it returns a [`Some`] value, the
+/// method wraps the result in a [`SortedFilterExpr`] instance with the original sort expression and
 /// the converted filter expression. Otherwise, this function returns an error.
 ///
-/// The [SortedFilterExpr] instance contains information about the sort order of columns that can
+/// The `SortedFilterExpr` instance contains information about the sort order of columns that can
 /// be used in the filter expression, which can be used to optimize the query execution process.
 pub fn build_filter_input_order(
     side: JoinSide,
     filter: &JoinFilter,
     schema: &SchemaRef,
     order: &PhysicalSortExpr,
-) -> Result<SortedFilterExpr> {
-    if let Some(expr) =
-        convert_sort_expr_with_filter_schema(&side, filter, schema, order)?
-    {
-        Ok(SortedFilterExpr::new(order.clone(), expr))
-    } else {
-        Err(DataFusionError::Plan(format!(
-            "The {side} side of the join does not have an expression sorted."
-        )))
-    }
+) -> Result<Option<SortedFilterExpr>> {
+    let opt_expr = convert_sort_expr_with_filter_schema(&side, filter, schema, order)?;
+    Ok(opt_expr.map(|filter_expr| SortedFilterExpr::new(order.clone(), filter_expr)))
 }
 
 /// Convert a physical expression into a filter expression using the given
@@ -364,7 +358,8 @@ pub mod tests {
             &filter,
             &Arc::new(left_child_schema),
             &left_child_sort_expr,
-        )?;
+        )?
+        .unwrap();
         assert!(left_child_sort_expr.eq(left_sort_filter_expr.origin_sorted_expr()));
 
         let right_sort_filter_expr = build_filter_input_order(
@@ -372,7 +367,8 @@ pub mod tests {
             &filter,
             &Arc::new(right_child_schema),
             &right_child_sort_expr,
-        )?;
+        )?
+        .unwrap();
         assert!(right_child_sort_expr.eq(right_sort_filter_expr.origin_sorted_expr()));
 
         // Assert that adjusted (left) filter expression matches with `left_child_sort_expr`:
@@ -495,8 +491,8 @@ pub mod tests {
                 expr: col("la1", left_schema.as_ref())?,
                 options: SortOptions::default(),
             }
-        )
-        .is_ok());
+        )?
+        .is_some());
         assert!(build_filter_input_order(
             JoinSide::Left,
             &filter,
@@ -505,8 +501,8 @@ pub mod tests {
                 expr: col("lt1", left_schema.as_ref())?,
                 options: SortOptions::default(),
             }
-        )
-        .is_err());
+        )?
+        .is_none());
         assert!(build_filter_input_order(
             JoinSide::Right,
             &filter,
@@ -515,8 +511,8 @@ pub mod tests {
                 expr: col("ra1", right_schema.as_ref())?,
                 options: SortOptions::default(),
             }
-        )
-        .is_ok());
+        )?
+        .is_some());
         assert!(build_filter_input_order(
             JoinSide::Right,
             &filter,
@@ -525,8 +521,8 @@ pub mod tests {
                 expr: col("rb1", right_schema.as_ref())?,
                 options: SortOptions::default(),
             }
-        )
-        .is_err());
+        )?
+        .is_none());
 
         Ok(())
     }
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs b/datafusion/core/src/physical_plan/joins/mod.rs
index 8ad50514f0..0a1bc147b8 100644
--- a/datafusion/core/src/physical_plan/joins/mod.rs
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -19,7 +19,6 @@
 
 pub use cross_join::CrossJoinExec;
 pub use hash_join::HashJoinExec;
-pub use hash_join_utils::convert_sort_expr_with_filter_schema;
 pub use nested_loop_join::NestedLoopJoinExec;
 // Note: SortMergeJoin is not used in plans yet
 pub use sort_merge_join::SortMergeJoinExec;
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 5a249e433d..dafd0bfd49 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -43,16 +43,15 @@ use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use futures::{Stream, StreamExt};
 use hashbrown::{raw::RawTable, HashSet};
+use parking_lot::Mutex;
 
 use datafusion_common::{utils::bisect, ScalarValue};
 use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
-use datafusion_physical_expr::{
-    make_sort_requirements_from_exprs, PhysicalSortRequirement,
-};
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::logical_expr::JoinType;
+use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
 use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
@@ -155,6 +154,7 @@ use crate::physical_plan::{
 /// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending)
 /// than that can be dropped from the inner buffer.
 /// ```
+#[derive(Debug)]
 pub struct SymmetricHashJoinExec {
     /// Left side stream
     pub(crate) left: Arc<dyn ExecutionPlan>,
@@ -163,17 +163,11 @@ pub struct SymmetricHashJoinExec {
     /// Set of common columns used to join on
     pub(crate) on: Vec<(Column, Column)>,
     /// Filters applied when finding matching rows
-    pub(crate) filter: JoinFilter,
+    pub(crate) filter: Option<JoinFilter>,
     /// How the join is performed
     pub(crate) join_type: JoinType,
-    /// Order information of filter expressions
-    sorted_filter_exprs: Vec<SortedFilterExpr>,
-    /// Left required sort
-    left_required_sort_exprs: Vec<PhysicalSortExpr>,
-    /// Right required sort
-    right_required_sort_exprs: Vec<PhysicalSortExpr>,
-    /// Expression graph for interval calculations
-    physical_expr_graph: ExprIntervalGraph,
+    /// Expression graph and `SortedFilterExpr`s for interval calculations
+    filter_state: Option<Arc<Mutex<IntervalCalculatorInnerState>>>,
     /// The schema once the join is applied
     schema: SchemaRef,
     /// Shares the `RandomState` for the hashing algorithm
@@ -186,6 +180,19 @@ pub struct SymmetricHashJoinExec {
     pub(crate) null_equals_null: bool,
 }
 
+struct IntervalCalculatorInnerState {
+    /// Expression graph for interval calculations
+    graph: Option<ExprIntervalGraph>,
+    sorted_exprs: Vec<Option<SortedFilterExpr>>,
+    calculated: bool,
+}
+
+impl Debug for IntervalCalculatorInnerState {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "Exprs({:?})", self.sorted_exprs)
+    }
+}
+
 #[derive(Debug)]
 struct SymmetricHashJoinSideMetrics {
     /// Number of batches consumed by this operator
@@ -250,7 +257,7 @@ impl SymmetricHashJoinExec {
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
         on: JoinOn,
-        filter: JoinFilter,
+        filter: Option<JoinFilter>,
         join_type: &JoinType,
         null_equals_null: bool,
     ) -> Result<Self> {
@@ -271,83 +278,27 @@ impl SymmetricHashJoinExec {
         let (schema, column_indices) =
             build_join_schema(&left_schema, &right_schema, join_type);
 
-        // Set a random state for the join:
+        // Initialize the random state for the join operation:
         let random_state = RandomState::with_seeds(0, 0, 0, 0);
 
-        // Create an expression DAG for the join filter:
-        let mut physical_expr_graph =
-            ExprIntervalGraph::try_new(filter.expression().clone())?;
-
-        // Interval calculations require each column to exhibit monotonicity
-        // independently. However, a `PhysicalSortExpr` object defines a
-        // lexicographical ordering, so we can only use their first elements.
-        // when deducing column monotonicities.
-        // TODO: Extend the `PhysicalSortExpr` mechanism to express independent
-        //       (i.e. simultaneous) ordering properties of columns.
-        let (left_ordering, right_ordering) = match (
-            left.output_ordering(),
-            right.output_ordering(),
-        ) {
-            (Some([left_ordering, ..]), Some([right_ordering, ..])) => {
-                (left_ordering, right_ordering)
-            }
-            _ => {
-                return Err(DataFusionError::Plan(
-                    "Symmetric hash join requires its children to have an output ordering".to_string(),
-                ));
-            }
+        let filter_state = if filter.is_some() {
+            let inner_state = IntervalCalculatorInnerState {
+                graph: None,
+                sorted_exprs: vec![],
+                calculated: false,
+            };
+            Some(Arc::new(Mutex::new(inner_state)))
+        } else {
+            None
         };
 
-        // Build the sorted filter expression for the left child:
-        let left_filter_expression = build_filter_input_order(
-            JoinSide::Left,
-            &filter,
-            &left.schema(),
-            left_ordering,
-        )?;
-
-        // Build the sorted filter expression for the right child:
-        let right_filter_expression = build_filter_input_order(
-            JoinSide::Right,
-            &filter,
-            &right.schema(),
-            right_ordering,
-        )?;
-
-        // Store the left and right sorted filter expressions in a vector
-        let mut sorted_filter_exprs =
-            vec![left_filter_expression, right_filter_expression];
-
-        // Gather node indices of converted filter expressions in `SortedFilterExpr`
-        // using the filter columns vector:
-        let child_node_indexes = physical_expr_graph.gather_node_indices(
-            &sorted_filter_exprs
-                .iter()
-                .map(|sorted_expr| sorted_expr.filter_expr().clone())
-                .collect::<Vec<_>>(),
-        );
-
-        // Inject calculated node indices into SortedFilterExpr:
-        for (sorted_expr, (_, index)) in sorted_filter_exprs
-            .iter_mut()
-            .zip(child_node_indexes.iter())
-        {
-            sorted_expr.set_node_index(*index);
-        }
-
-        let left_required_sort_exprs = vec![left_ordering.clone()];
-        let right_required_sort_exprs = vec![right_ordering.clone()];
-
         Ok(SymmetricHashJoinExec {
             left,
             right,
             on,
             filter,
             join_type: *join_type,
-            sorted_filter_exprs,
-            left_required_sort_exprs,
-            right_required_sort_exprs,
-            physical_expr_graph,
+            filter_state,
             schema: Arc::new(schema),
             random_state,
             metrics: ExecutionPlanMetricsSet::new(),
@@ -372,8 +323,8 @@ impl SymmetricHashJoinExec {
     }
 
     /// Filters applied before join output
-    pub fn filter(&self) -> &JoinFilter {
-        &self.filter
+    pub fn filter(&self) -> Option<&JoinFilter> {
+        self.filter.as_ref()
     }
 
     /// How the join is performed
@@ -385,11 +336,33 @@ impl SymmetricHashJoinExec {
     pub fn null_equals_null(&self) -> bool {
         self.null_equals_null
     }
-}
 
-impl Debug for SymmetricHashJoinExec {
-    fn fmt(&self, _f: &mut Formatter<'_>) -> fmt::Result {
-        todo!()
+    /// Check if order information covers every column in the filter expression.
+    pub fn check_if_order_information_available(&self) -> Result<bool> {
+        if let Some(filter) = self.filter() {
+            let left = self.left();
+            if let Some(left_ordering) = left.output_ordering() {
+                let right = self.right();
+                if let Some(right_ordering) = right.output_ordering() {
+                    let left_convertible = convert_sort_expr_with_filter_schema(
+                        &JoinSide::Left,
+                        filter,
+                        &left.schema(),
+                        &left_ordering[0],
+                    )?
+                    .is_some();
+                    let right_convertible = convert_sort_expr_with_filter_schema(
+                        &JoinSide::Right,
+                        filter,
+                        &right.schema(),
+                        &right_ordering[0],
+                    )?
+                    .is_some();
+                    return Ok(left_convertible && right_convertible);
+                }
+            }
+        }
+        Ok(false)
     }
 }
 
@@ -402,14 +375,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         self.schema.clone()
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
-        let left_required =
-            make_sort_requirements_from_exprs(&self.left_required_sort_exprs);
-        let right_required =
-            make_sort_requirements_from_exprs(&self.right_required_sort_exprs);
-        vec![Some(left_required), Some(right_required)]
-    }
-
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children.iter().any(|u| *u))
     }
@@ -420,7 +385,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
             .iter()
             .map(|(l, r)| (Arc::new(l.clone()) as _, Arc::new(r.clone()) as _))
             .unzip();
-        // TODO: This will change when we extend collected executions.
+        // TODO:  This will change when we extend collected executions.
         vec![
             if self.left.output_partitioning().partition_count() == 1 {
                 Distribution::SinglePartition
@@ -483,7 +448,10 @@ impl ExecutionPlan for SymmetricHashJoinExec {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                let display_filter = format!(", filter={:?}", self.filter.expression());
+                let display_filter = self.filter.as_ref().map_or_else(
+                    || "".to_string(),
+                    |f| format!(", filter={:?}", f.expression()),
+                );
                 write!(
                     f,
                     "SymmetricHashJoinExec: join_type={:?}, on={:?}{}",
@@ -507,20 +475,100 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        // If `filter_state` and `filter` are both present, then calculate sorted filter expressions
+        // for both sides, and build an expression graph if one is not already built.
+        let (left_sorted_filter_expr, right_sorted_filter_expr, graph) =
+            match (&self.filter_state, &self.filter) {
+                (Some(interval_state), Some(filter)) => {
+                    // Lock the mutex of the interval state:
+                    let mut filter_state = interval_state.lock();
+                    // If this is the first partition to be invoked, then we need to initialize our state
+                    // (the expression graph for pruning, sorted filter expressions etc.)
+                    if !filter_state.calculated {
+                        // Interval calculations require each column to exhibit monotonicity
+                        // independently. However, a `PhysicalSortExpr` object defines a
+                        // lexicographical ordering, so we can only use their first elements.
+                        // when deducing column monotonicities.
+                        // TODO: Extend the `PhysicalSortExpr` mechanism to express independent
+                        //       (i.e. simultaneous) ordering properties of columns.
+
+                        // Build sorted filter expressions for the left and right join side:
+                        let join_sides = [JoinSide::Left, JoinSide::Right];
+                        let children = [&self.left, &self.right];
+                        for (join_side, child) in join_sides.iter().zip(children.iter()) {
+                            let sorted_expr = child
+                                .output_ordering()
+                                .and_then(|orders| orders.first())
+                                .and_then(|order| {
+                                    build_filter_input_order(
+                                        *join_side,
+                                        filter,
+                                        &child.schema(),
+                                        order,
+                                    )
+                                    .transpose()
+                                })
+                                .transpose()?;
+
+                            filter_state.sorted_exprs.push(sorted_expr);
+                        }
+
+                        // Collect available sorted filter expressions:
+                        let sorted_exprs_size = filter_state.sorted_exprs.len();
+                        let mut sorted_exprs = filter_state
+                            .sorted_exprs
+                            .iter_mut()
+                            .flatten()
+                            .collect::<Vec<_>>();
+
+                        // Create the expression graph if we can create sorted filter expressions for both children:
+                        filter_state.graph = if sorted_exprs.len() == sorted_exprs_size {
+                            let mut graph =
+                                ExprIntervalGraph::try_new(filter.expression().clone())?;
+
+                            // Gather filter expressions:
+                            let filter_exprs = sorted_exprs
+                                .iter()
+                                .map(|sorted_expr| sorted_expr.filter_expr().clone())
+                                .collect::<Vec<_>>();
+
+                            // Gather node indices of converted filter expressions in `SortedFilterExpr`s
+                            // using the filter columns vector:
+                            let child_node_indices =
+                                graph.gather_node_indices(&filter_exprs);
+
+                            // Update SortedFilterExpr instances with the corresponding node indices:
+                            for (sorted_expr, (_, index)) in
+                                sorted_exprs.iter_mut().zip(child_node_indices.iter())
+                            {
+                                sorted_expr.set_node_index(*index);
+                            }
+
+                            Some(graph)
+                        } else {
+                            None
+                        };
+                        filter_state.calculated = true;
+                    }
+                    // Return the sorted filter expressions for both sides along with the expression graph:
+                    (
+                        filter_state.sorted_exprs[0].clone(),
+                        filter_state.sorted_exprs[1].clone(),
+                        filter_state.graph.as_ref().cloned(),
+                    )
+                }
+                // If `filter_state` or `filter` is not present, then return None for all three values:
+                (_, _) => (None, None, None),
+            };
+
         let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>();
         let on_right = self.on.iter().map(|on| on.1.clone()).collect::<Vec<_>>();
-        let left_side_joiner = OneSideHashJoiner::new(
-            JoinSide::Left,
-            self.sorted_filter_exprs[0].clone(),
-            on_left,
-            self.left.schema(),
-        );
-        let right_side_joiner = OneSideHashJoiner::new(
-            JoinSide::Right,
-            self.sorted_filter_exprs[1].clone(),
-            on_right,
-            self.right.schema(),
-        );
+
+        let left_side_joiner =
+            OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
+        let right_side_joiner =
+            OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
+
         let left_stream = self.left.execute(partition, context.clone())?;
         let right_stream = self.right.execute(partition, context)?;
 
@@ -535,7 +583,9 @@ impl ExecutionPlan for SymmetricHashJoinExec {
             right: right_side_joiner,
             column_indices: self.column_indices.clone(),
             metrics: SymmetricHashJoinMetrics::new(partition, &self.metrics),
-            physical_expr_graph: self.physical_expr_graph.clone(),
+            graph,
+            left_sorted_filter_expr,
+            right_sorted_filter_expr,
             null_equals_null: self.null_equals_null,
             final_result: false,
             probe_side: JoinSide::Left,
@@ -552,7 +602,7 @@ struct SymmetricHashJoinStream {
     /// Input schema
     schema: Arc<Schema>,
     /// join filter
-    filter: JoinFilter,
+    filter: Option<JoinFilter>,
     /// type of the join
     join_type: JoinType,
     // left hash joiner
@@ -561,8 +611,12 @@ struct SymmetricHashJoinStream {
     right: OneSideHashJoiner,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
-    // Range pruner.
-    physical_expr_graph: ExprIntervalGraph,
+    // Expression graph for range pruning.
+    graph: Option<ExprIntervalGraph>,
+    // Left globally sorted filter expr
+    left_sorted_filter_expr: Option<SortedFilterExpr>,
+    // Right globally sorted filter expr
+    right_sorted_filter_expr: Option<SortedFilterExpr>,
     /// Random state used for hashing initialization
     random_state: RandomState,
     /// If null_equals_null is true, null == null else null != null
@@ -696,43 +750,49 @@ fn calculate_filter_expr_intervals(
     if build_input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
         return Ok(());
     }
-    // Evaluate build side filter expression and convert the result to an array
-    let build_array = build_sorted_filter_expr
-        .origin_sorted_expr()
-        .expr
-        .evaluate(&build_input_buffer.slice(0, 1))?
-        .into_array(1);
-    // Evaluate probe side filter expression and convert the result to an array
-    let probe_array = probe_sorted_filter_expr
+    // Calculate the interval for the build side filter expression (if present):
+    update_filter_expr_interval(
+        &build_input_buffer.slice(0, 1),
+        build_sorted_filter_expr,
+    )?;
+    // Calculate the interval for the probe side filter expression (if present):
+    update_filter_expr_interval(
+        &probe_batch.slice(probe_batch.num_rows() - 1, 1),
+        probe_sorted_filter_expr,
+    )
+}
+
+/// This is a subroutine of the function [`calculate_filter_expr_intervals`].
+/// It constructs the current interval using the given `batch` and updates
+/// the filter expression (i.e. `sorted_expr`) with this interval.
+fn update_filter_expr_interval(
+    batch: &RecordBatch,
+    sorted_expr: &mut SortedFilterExpr,
+) -> Result<()> {
+    // Evaluate the filter expression and convert the result to an array:
+    let array = sorted_expr
         .origin_sorted_expr()
         .expr
-        .evaluate(&probe_batch.slice(probe_batch.num_rows() - 1, 1))?
+        .evaluate(batch)?
         .into_array(1);
-
-    // Update intervals for both build and probe side filter expressions
-    for (array, sorted_expr) in vec![
-        (build_array, build_sorted_filter_expr),
-        (probe_array, probe_sorted_filter_expr),
-    ] {
-        // Convert the array to a ScalarValue:
-        let value = ScalarValue::try_from_array(&array, 0)?;
-        // Create a ScalarValue representing positive or negative infinity for the same data type:
-        let infinite = ScalarValue::try_from(value.get_datatype())?;
-        // Update the interval with lower and upper bounds based on the sort option
-        sorted_expr.set_interval(
-            if sorted_expr.origin_sorted_expr().options.descending {
-                Interval {
-                    lower: infinite,
-                    upper: value,
-                }
-            } else {
-                Interval {
-                    lower: value,
-                    upper: infinite,
-                }
-            },
-        );
-    }
+    // Convert the array to a ScalarValue:
+    let value = ScalarValue::try_from_array(&array, 0)?;
+    // Create a ScalarValue representing positive or negative infinity for the same data type:
+    let infinite = ScalarValue::try_from(value.get_datatype())?;
+    // Update the interval with lower and upper bounds based on the sort option:
+    let interval = if sorted_expr.origin_sorted_expr().options.descending {
+        Interval {
+            lower: infinite,
+            upper: value,
+        }
+    } else {
+        Interval {
+            lower: value,
+            upper: infinite,
+        }
+    };
+    // Set the calculated interval for the sorted filter expression:
+    sorted_expr.set_interval(interval);
     Ok(())
 }
 
@@ -950,8 +1010,6 @@ where
 struct OneSideHashJoiner {
     /// Build side
     build_side: JoinSide,
-    /// Build side filter sort information
-    sorted_filter_expr: SortedFilterExpr,
     /// Input record batch buffer
     input_buffer: RecordBatch,
     /// Columns from the side
@@ -973,12 +1031,7 @@ struct OneSideHashJoiner {
 }
 
 impl OneSideHashJoiner {
-    pub fn new(
-        build_side: JoinSide,
-        sorted_filter_expr: SortedFilterExpr,
-        on: Vec<Column>,
-        schema: SchemaRef,
-    ) -> Self {
+    pub fn new(build_side: JoinSide, on: Vec<Column>, schema: SchemaRef) -> Self {
         Self {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
@@ -986,7 +1039,6 @@ impl OneSideHashJoiner {
             hashmap: JoinHashMap(RawTable::with_capacity(10_000)),
             row_hash_values: VecDeque::new(),
             hashes_buffer: vec![],
-            sorted_filter_expr,
             visited_rows: HashSet::new(),
             offset: 0,
             deleted_offset: 0,
@@ -1052,7 +1104,7 @@ impl OneSideHashJoiner {
         schema: &SchemaRef,
         join_type: JoinType,
         on_probe: &[Column],
-        filter: &JoinFilter,
+        filter: Option<&JoinFilter>,
         probe_batch: &RecordBatch,
         probe_visited: &mut HashSet<usize>,
         probe_offset: usize,
@@ -1061,7 +1113,7 @@ impl OneSideHashJoiner {
         null_equals_null: bool,
     ) -> Result<Option<RecordBatch>> {
         if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
-            return Ok(Some(RecordBatch::new_empty(schema.clone())));
+            return Ok(None);
         }
         let (build_indices, probe_indices) = build_join_indices(
             probe_batch,
@@ -1069,7 +1121,7 @@ impl OneSideHashJoiner {
             &self.input_buffer,
             &self.on,
             on_probe,
-            Some(filter),
+            filter,
             random_state,
             null_equals_null,
             &mut self.hashes_buffer,
@@ -1104,7 +1156,7 @@ impl OneSideHashJoiner {
                 column_indices,
                 self.build_side,
             )
-            .map(Some)
+            .map(|batch| (batch.num_rows() > 0).then_some(batch))
         }
     }
 
@@ -1156,7 +1208,7 @@ impl OneSideHashJoiner {
                 column_indices,
                 self.build_side,
             )
-            .map(Some)
+            .map(|batch| (batch.num_rows() > 0).then_some(batch))
         } else {
             // If we don't need to produce a result, return None
             Ok(None)
@@ -1178,55 +1230,54 @@ impl OneSideHashJoiner {
     /// * `join_type` - The type of join (e.g. inner, left, right, etc.).
     /// * `column_indices` - A vector of column indices that specifies which columns from the
     ///     build side should be included in the output.
-    /// * `physical_expr_graph` - A mutable reference to the physical expression graph.
+    /// * `graph` - A mutable reference to the physical expression graph.
     ///
     /// # Returns
     ///
     /// If there are rows to prune, returns the pruned build side record batch wrapped in an `Ok` variant.
     /// Otherwise, returns `Ok(None)`.
-    fn prune_with_probe_batch(
+    fn calculate_prune_length_with_probe_batch(
         &mut self,
-        schema: &SchemaRef,
-        probe_batch: &RecordBatch,
+        build_side_sorted_filter_expr: &mut SortedFilterExpr,
         probe_side_sorted_filter_expr: &mut SortedFilterExpr,
-        join_type: JoinType,
-        column_indices: &[ColumnIndex],
-        physical_expr_graph: &mut ExprIntervalGraph,
-    ) -> Result<Option<RecordBatch>> {
-        // Check if the input buffer is empty:
+        graph: &mut ExprIntervalGraph,
+    ) -> Result<usize> {
+        // Return early if the input buffer is empty:
         if self.input_buffer.num_rows() == 0 {
-            return Ok(None);
+            return Ok(0);
         }
-        // Convert the sorted filter expressions into a vector of (node_index, interval)
-        // tuples for use when updating the interval graph.
-        let mut filter_intervals = vec![
-            (
-                self.sorted_filter_expr.node_index(),
-                self.sorted_filter_expr.interval().clone(),
-            ),
-            (
-                probe_side_sorted_filter_expr.node_index(),
-                probe_side_sorted_filter_expr.interval().clone(),
-            ),
-        ];
-        // Use the join filter intervals to update the physical expression graph:
-        physical_expr_graph.update_ranges(&mut filter_intervals)?;
-        // Get the new join filter interval for build side:
-        let calculated_build_side_interval = filter_intervals.remove(0).1;
-        // Check if the intervals changed, exit early if not:
-        if calculated_build_side_interval.eq(self.sorted_filter_expr.interval()) {
-            return Ok(None);
+        // Process the build and probe side sorted filter expressions if both are present:
+        // Collect the sorted filter expressions into a vector of (node_index, interval) tuples:
+        let mut filter_intervals = vec![];
+        for expr in [
+            &build_side_sorted_filter_expr,
+            &probe_side_sorted_filter_expr,
+        ] {
+            filter_intervals.push((expr.node_index(), expr.interval().clone()))
         }
-        // Determine the pruning length if there was a change in the intervals:
-        self.sorted_filter_expr
-            .set_interval(calculated_build_side_interval);
-        let prune_length =
-            determine_prune_length(&self.input_buffer, &self.sorted_filter_expr)?;
-        // If we can not prune, exit early:
-        if prune_length == 0 {
-            return Ok(None);
+        // Update the physical expression graph using the join filter intervals:
+        graph.update_ranges(&mut filter_intervals)?;
+        // Extract the new join filter interval for the build side:
+        let calculated_build_side_interval = filter_intervals.remove(0).1;
+        // If the intervals have not changed, return early without pruning:
+        if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
+            return Ok(0);
         }
-        // Compute the result, and perform pruning if there are rows to prune:
+        // Update the build side interval and determine the pruning length:
+        build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
+
+        determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
+    }
+
+    fn prune_internal_state_and_build_anti_result(
+        &mut self,
+        prune_length: usize,
+        schema: &SchemaRef,
+        probe_batch: &RecordBatch,
+        join_type: JoinType,
+        column_indices: &[ColumnIndex],
+    ) -> Result<Option<RecordBatch>> {
+        // Compute the result and perform pruning if there are rows to prune:
         let result = self.build_side_determined_results(
             schema,
             prune_length,
@@ -1234,18 +1285,22 @@ impl OneSideHashJoiner {
             join_type,
             column_indices,
         );
+        // Prune the hash values:
         prune_hash_values(
             prune_length,
             &mut self.hashmap,
             &mut self.row_hash_values,
             self.deleted_offset as u64,
         )?;
+        // Remove pruned rows from the visited rows set:
         for row in self.deleted_offset..(self.deleted_offset + prune_length) {
             self.visited_rows.remove(&row);
         }
+        // Update the input buffer after pruning:
         self.input_buffer = self
             .input_buffer
             .slice(prune_length, self.input_buffer.num_rows() - prune_length);
+        // Increment the deleted offset:
         self.deleted_offset += prune_length;
         result
     }
@@ -1328,6 +1383,8 @@ impl SymmetricHashJoinStream {
                 input_stream,
                 probe_hash_joiner,
                 build_hash_joiner,
+                probe_side_sorted_filter_expr,
+                build_side_sorted_filter_expr,
                 build_join_side,
                 probe_side_metrics,
             ) = if self.probe_side.eq(&JoinSide::Left) {
@@ -1335,6 +1392,8 @@ impl SymmetricHashJoinStream {
                     &mut self.left_stream,
                     &mut self.left,
                     &mut self.right,
+                    &mut self.left_sorted_filter_expr,
+                    &mut self.right_sorted_filter_expr,
                     JoinSide::Right,
                     &mut self.metrics.left,
                 )
@@ -1343,6 +1402,8 @@ impl SymmetricHashJoinStream {
                     &mut self.right_stream,
                     &mut self.right,
                     &mut self.left,
+                    &mut self.right_sorted_filter_expr,
+                    &mut self.left_sorted_filter_expr,
                     JoinSide::Left,
                     &mut self.metrics.right,
                 )
@@ -1357,19 +1418,12 @@ impl SymmetricHashJoinStream {
                     // Update the internal state of the hash joiner for the build side:
                     probe_hash_joiner
                         .update_internal_state(&probe_batch, &self.random_state)?;
-                    // Calculate filter intervals:
-                    calculate_filter_expr_intervals(
-                        &build_hash_joiner.input_buffer,
-                        &mut build_hash_joiner.sorted_filter_expr,
-                        &probe_batch,
-                        &mut probe_hash_joiner.sorted_filter_expr,
-                    )?;
                     // Join the two sides:
                     let equal_result = build_hash_joiner.join_with_probe_batch(
                         &self.schema,
                         self.join_type,
                         &probe_hash_joiner.on,
-                        &self.filter,
+                        self.filter.as_ref(),
                         &probe_batch,
                         &mut probe_hash_joiner.visited_rows,
                         probe_hash_joiner.offset,
@@ -1379,16 +1433,45 @@ impl SymmetricHashJoinStream {
                     )?;
                     // Increment the offset for the probe hash joiner:
                     probe_hash_joiner.offset += probe_batch.num_rows();
-                    // Prune the build side input buffer using the expression
-                    // DAG and filter intervals:
-                    let anti_result = build_hash_joiner.prune_with_probe_batch(
-                        &self.schema,
-                        &probe_batch,
-                        &mut probe_hash_joiner.sorted_filter_expr,
-                        self.join_type,
-                        &self.column_indices,
-                        &mut self.physical_expr_graph,
-                    )?;
+
+                    let anti_result = if let (
+                        Some(build_side_sorted_filter_expr),
+                        Some(probe_side_sorted_filter_expr),
+                        Some(graph),
+                    ) = (
+                        build_side_sorted_filter_expr.as_mut(),
+                        probe_side_sorted_filter_expr.as_mut(),
+                        self.graph.as_mut(),
+                    ) {
+                        // Calculate filter intervals:
+                        calculate_filter_expr_intervals(
+                            &build_hash_joiner.input_buffer,
+                            build_side_sorted_filter_expr,
+                            &probe_batch,
+                            probe_side_sorted_filter_expr,
+                        )?;
+                        let prune_length = build_hash_joiner
+                            .calculate_prune_length_with_probe_batch(
+                                build_side_sorted_filter_expr,
+                                probe_side_sorted_filter_expr,
+                                graph,
+                            )?;
+
+                        if prune_length > 0 {
+                            build_hash_joiner.prune_internal_state_and_build_anti_result(
+                                prune_length,
+                                &self.schema,
+                                &probe_batch,
+                                self.join_type,
+                                &self.column_indices,
+                            )?
+                        } else {
+                            None
+                        }
+                    } else {
+                        None
+                    };
+
                     // Combine results:
                     let result =
                         combine_two_batches(&self.schema, equal_result, anti_result)?;
@@ -1446,12 +1529,12 @@ mod tests {
     use crate::physical_plan::{
         collect, common, memory::MemoryExec, repartition::RepartitionExec,
     };
-    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
     use crate::test_util;
 
     use super::*;
 
-    const TABLE_SIZE: i32 = 1_000;
+    const TABLE_SIZE: i32 = 100;
 
     fn compare_batches(collected_1: &[RecordBatch], collected_2: &[RecordBatch]) {
         // compare
@@ -1479,7 +1562,7 @@ mod tests {
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
         on: JoinOn,
-        filter: JoinFilter,
+        filter: Option<JoinFilter>,
         join_type: &JoinType,
         null_equals_null: bool,
         context: Arc<TaskContext>,
@@ -1530,7 +1613,7 @@ mod tests {
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
         on: JoinOn,
-        filter: JoinFilter,
+        filter: Option<JoinFilter>,
         join_type: &JoinType,
         null_equals_null: bool,
         context: Arc<TaskContext>,
@@ -1552,7 +1635,7 @@ mod tests {
                 Partitioning::Hash(right_expr, partition_count),
             )?),
             on,
-            Some(filter),
+            filter,
             join_type,
             PartitionMode::Partitioned,
             null_equals_null,
@@ -1740,34 +1823,33 @@ mod tests {
     fn create_memory_table(
         left_batch: RecordBatch,
         right_batch: RecordBatch,
-        left_sorted: Vec<PhysicalSortExpr>,
-        right_sorted: Vec<PhysicalSortExpr>,
+        left_sorted: Option<Vec<PhysicalSortExpr>>,
+        right_sorted: Option<Vec<PhysicalSortExpr>>,
         batch_size: usize,
     ) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
-        Ok((
-            Arc::new(
-                MemoryExec::try_new(
-                    &[split_record_batches(&left_batch, batch_size).unwrap()],
-                    left_batch.schema(),
-                    None,
-                )?
-                .with_sort_information(left_sorted),
-            ),
-            Arc::new(
-                MemoryExec::try_new(
-                    &[split_record_batches(&right_batch, batch_size).unwrap()],
-                    right_batch.schema(),
-                    None,
-                )?
-                .with_sort_information(right_sorted),
-            ),
-        ))
+        let mut left = MemoryExec::try_new(
+            &[split_record_batches(&left_batch, batch_size)?],
+            left_batch.schema(),
+            None,
+        )?;
+        if let Some(sorted) = left_sorted {
+            left = left.with_sort_information(sorted);
+        }
+        let mut right = MemoryExec::try_new(
+            &[split_record_batches(&right_batch, batch_size)?],
+            right_batch.schema(),
+            None,
+        )?;
+        if let Some(sorted) = right_sorted {
+            right = right.with_sort_information(sorted);
+        }
+        Ok((Arc::new(left), Arc::new(right)))
     }
 
     async fn experiment(
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
-        filter: JoinFilter,
+        filter: Option<JoinFilter>,
         join_type: JoinType,
         on: JoinOn,
         task_ctx: Arc<TaskContext>,
@@ -1813,8 +1895,7 @@ mod tests {
         cardinality: (i32, i32),
     ) -> Result<()> {
         // a + b > c + 10 AND a + b < c + 100
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
+        let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
@@ -1833,8 +1914,13 @@ mod tests {
             expr: col("ra1", right_schema)?,
             options: SortOptions::default(),
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -1863,7 +1949,7 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -1890,8 +1976,7 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4)] case_expr: usize,
     ) -> Result<()> {
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
+        let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
@@ -1905,8 +1990,13 @@ mod tests {
             expr: col("ra1", right_schema)?,
             options: SortOptions::default(),
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -1934,38 +2024,40 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test(flavor = "multi_thread")]
-    async fn single_test() -> Result<()> {
-        let case_expr = 1;
-        let cardinality = (11, 21);
-        let join_type = JoinType::Full;
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
+    async fn join_without_sort_information(
+        #[values(
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::RightSemi,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+            JoinType::RightAnti,
+            JoinType::Full
+        )]
+        join_type: JoinType,
+        #[values(
+        (4, 5),
+        (11, 21),
+        (31, 71),
+        (99, 12),
+        )]
+        cardinality: (i32, i32),
+        #[values(0, 1, 2, 3, 4)] case_expr: usize,
+    ) -> Result<()> {
+        let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
         let right_schema = &right_batch.schema();
-        let left_sorted = vec![PhysicalSortExpr {
-            expr: col("la1_des", left_schema)?,
-            options: SortOptions {
-                descending: true,
-                nulls_first: true,
-            },
-        }];
-        let right_sorted = vec![PhysicalSortExpr {
-            expr: col("ra1_des", right_schema)?,
-            options: SortOptions {
-                descending: true,
-                nulls_first: true,
-            },
-        }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(left_batch, right_batch, None, None, 13)?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -1993,7 +2085,37 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
+        Ok(())
+    }
+
+    #[rstest]
+    #[tokio::test(flavor = "multi_thread")]
+    async fn join_without_filter(
+        #[values(
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::RightSemi,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+            JoinType::RightAnti,
+            JoinType::Full
+        )]
+        join_type: JoinType,
+    ) -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, (11, 21))?;
+        let left_schema = &left_batch.schema();
+        let right_schema = &right_batch.schema();
+        let (left, right) = create_memory_table(left_batch, right_batch, None, None, 13)?;
+
+        let on = vec![(
+            Column::new_with_schema("lc1", left_schema)?,
+            Column::new_with_schema("rc1", right_schema)?,
+        )];
+        experiment(left, right, None, join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2020,8 +2142,7 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4)] case_expr: usize,
     ) -> Result<()> {
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
+        let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
@@ -2041,8 +2162,13 @@ mod tests {
                 nulls_first: true,
             },
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -2070,7 +2196,7 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2107,6 +2233,79 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test(flavor = "multi_thread")]
+    async fn join_change_in_planner_without_sort() -> Result<()> {
+        let config = SessionConfig::new().with_target_partitions(1);
+        let ctx = SessionContext::with_config(config);
+        let tmp_dir = TempDir::new()?;
+        let left_file_path = tmp_dir.path().join("left.csv");
+        File::create(left_file_path.clone())?;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::UInt32, false),
+            Field::new("a2", DataType::UInt32, false),
+        ]));
+        ctx.register_csv(
+            "left",
+            left_file_path.as_os_str().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema).mark_infinite(true),
+        )
+        .await?;
+        let right_file_path = tmp_dir.path().join("right.csv");
+        File::create(right_file_path.clone())?;
+        ctx.register_csv(
+            "right",
+            right_file_path.as_os_str().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema).mark_infinite(true),
+        )
+        .await?;
+        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
+        let physical_plan = df.create_physical_plan().await?;
+        let task_ctx = ctx.task_ctx();
+        let results = collect(physical_plan.clone(), task_ctx).await?;
+        let formatted = pretty_format_batches(&results)?.to_string();
+        let found = formatted
+            .lines()
+            .any(|line| line.contains("SymmetricHashJoinExec"));
+        assert!(found);
+        Ok(())
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
+        let config =
+            SessionConfig::new().with_allow_symmetric_joins_without_pruning(false);
+        let ctx = SessionContext::with_config(config);
+        let tmp_dir = TempDir::new()?;
+        let left_file_path = tmp_dir.path().join("left.csv");
+        File::create(left_file_path.clone())?;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::UInt32, false),
+            Field::new("a2", DataType::UInt32, false),
+        ]));
+        ctx.register_csv(
+            "left",
+            left_file_path.as_os_str().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema).mark_infinite(true),
+        )
+        .await?;
+        let right_file_path = tmp_dir.path().join("right.csv");
+        File::create(right_file_path.clone())?;
+        ctx.register_csv(
+            "right",
+            right_file_path.as_os_str().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema).mark_infinite(true),
+        )
+        .await?;
+        let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
+        match df.create_physical_plan().await {
+            Ok(_) => panic!("Expecting error."),
+            Err(e) => {
+                assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError during planning: Join operation cannot operate on a non-prunable stream without enabling the 'allow_symmetric_joins_without_pruning' configuration flag")
+            }
+        }
+        Ok(())
+    }
+
     #[tokio::test(flavor = "multi_thread")]
     async fn build_null_columns_first() -> Result<()> {
         let join_type = JoinType::Full;
@@ -2133,8 +2332,13 @@ mod tests {
                 nulls_first: true,
             },
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -2161,7 +2365,7 @@ mod tests {
             },
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2191,8 +2395,13 @@ mod tests {
                 nulls_first: false,
             },
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -2220,7 +2429,7 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2250,8 +2459,13 @@ mod tests {
                 nulls_first: true,
             },
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -2279,7 +2493,7 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2305,8 +2519,13 @@ mod tests {
             expr: col("ra1", right_schema)?,
             options: SortOptions::default(),
         }];
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 13)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            13,
+        )?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -2335,7 +2554,7 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        experiment(left, right, filter, join_type, on, task_ctx).await?;
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
 
@@ -2382,8 +2601,13 @@ mod tests {
             options: SortOptions::default(),
         }];
         // Construct MemoryExec
-        let (left, right) =
-            create_memory_table(left_batch, right_batch, left_sorted, right_sorted, 10)?;
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            Some(left_sorted),
+            Some(right_sorted),
+            10,
+        )?;
 
         // Filter columns, ensure first batches will have matching rows.
         let intermediate_schema = Schema::new(vec![
@@ -2414,30 +2638,14 @@ mod tests {
         ];
         let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
 
-        let left_sorted_filter_expr = SortedFilterExpr::new(
-            PhysicalSortExpr {
-                expr: col("la1", &left_schema)?,
-                options: SortOptions::default(),
-            },
-            Arc::new(Column::new("0", 0)),
-        );
         let mut left_side_joiner = OneSideHashJoiner::new(
             JoinSide::Left,
-            left_sorted_filter_expr,
             vec![Column::new_with_schema("lc1", &left_schema)?],
             left_schema,
         );
 
-        let right_sorted_filter_expr = SortedFilterExpr::new(
-            PhysicalSortExpr {
-                expr: col("ra1", &right_schema)?,
-                options: SortOptions::default(),
-            },
-            Arc::new(Column::new("1", 0)),
-        );
         let mut right_side_joiner = OneSideHashJoiner::new(
             JoinSide::Right,
-            right_sorted_filter_expr,
             vec![Column::new_with_schema("rc1", &right_schema)?],
             right_schema,
         );
@@ -2463,7 +2671,7 @@ mod tests {
             &join_schema,
             join_type,
             &right_side_joiner.on,
-            &filter,
+            Some(&filter),
             &initial_right_batch,
             &mut right_side_joiner.visited_rows,
             right_side_joiner.offset,
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index d8b66b0a29..a3dfe42e21 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -22,27 +22,24 @@
 mod unix_test {
     use arrow::array::Array;
     use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::execution::options::ReadOptions;
     use datafusion::{
         prelude::{CsvReadOptions, SessionConfig, SessionContext},
-        test_util::{
-            aggr_test_schema, arrow_test_data, test_create_unbounded_sorted_file,
-        },
+        test_util::{aggr_test_schema, arrow_test_data},
     };
     use datafusion_common::{DataFusionError, Result};
+    use datafusion_expr::Expr;
     use futures::StreamExt;
     use itertools::enumerate;
     use nix::sys::stat;
     use nix::unistd;
-    use rand::rngs::StdRng;
-    use rand::{Rng, SeedableRng};
     use rstest::*;
     use std::fs::{File, OpenOptions};
     use std::io::Write;
     use std::path::Path;
     use std::path::PathBuf;
-    use std::sync::mpsc;
-    use std::sync::mpsc::{Receiver, Sender};
-    use std::sync::{Arc, Mutex};
+    use std::sync::atomic::{AtomicBool, Ordering};
+    use std::sync::Arc;
     use std::thread;
     use std::thread::JoinHandle;
     use std::time::{Duration, Instant};
@@ -73,40 +70,81 @@ mod unix_test {
         line: &str,
         ref_time: Instant,
         broken_pipe_timeout: Duration,
-    ) -> Result<usize> {
+    ) -> Result<()> {
         // We need to handle broken pipe error until the reader is ready. This
         // is why we use a timeout to limit the wait duration for the reader.
         // If the error is different than broken pipe, we fail immediately.
-        file.write(line.as_bytes()).or_else(|e| {
+        while let Err(e) = file.write_all(line.as_bytes()) {
             if e.raw_os_error().unwrap() == 32 {
                 let interval = Instant::now().duration_since(ref_time);
                 if interval < broken_pipe_timeout {
                     thread::sleep(Duration::from_millis(100));
-                    return Ok(0);
+                    continue;
                 }
             }
-            Err(DataFusionError::Execution(e.to_string()))
-        })
+            return Err(DataFusionError::Execution(e.to_string()));
+        }
+        Ok(())
     }
 
-    async fn create_ctx(
-        fifo_path: &Path,
-        with_unbounded_execution: bool,
-    ) -> Result<SessionContext> {
-        let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE);
+    // This test provides a relatively realistic end-to-end scenario where
+    // we swap join sides to accommodate a FIFO source.
+    #[rstest]
+    #[timeout(std::time::Duration::from_secs(30))]
+    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+    async fn unbounded_file_with_swapped_join(
+        #[values(true, false)] unbounded_file: bool,
+    ) -> Result<()> {
+        // Create session context
+        let config = SessionConfig::new()
+            .with_batch_size(TEST_BATCH_SIZE)
+            .with_collect_statistics(false)
+            .with_target_partitions(1);
         let ctx = SessionContext::with_config(config);
-        // Register left table
-        let left_schema = Arc::new(Schema::new(vec![
+        // To make unbounded deterministic
+        let waiting = Arc::new(AtomicBool::new(unbounded_file));
+        // Create a new temporary FIFO file
+        let tmp_dir = TempDir::new()?;
+        let fifo_path =
+            create_fifo_file(&tmp_dir, &format!("fifo_{:?}.csv", unbounded_file))?;
+        // Execution can calculated at least one RecordBatch after the number of
+        // "joinable_lines_length" lines are read.
+        let joinable_lines_length =
+            (TEST_DATA_SIZE as f64 * TEST_JOIN_RATIO).round() as usize;
+        // The row including "a" is joinable with aggregate_test_100.c1
+        let joinable_iterator = (0..joinable_lines_length).map(|_| "a".to_string());
+        let second_joinable_iterator =
+            (0..joinable_lines_length).map(|_| "a".to_string());
+        // The row including "zzz" is not joinable with aggregate_test_100.c1
+        let non_joinable_iterator =
+            (0..(TEST_DATA_SIZE - joinable_lines_length)).map(|_| "zzz".to_string());
+        let lines = joinable_iterator
+            .chain(non_joinable_iterator)
+            .chain(second_joinable_iterator)
+            .zip(0..TEST_DATA_SIZE)
+            .map(|(a1, a2)| format!("{a1},{a2}\n"))
+            .collect::<Vec<_>>();
+        // Create writing threads for the left and right FIFO files
+        let task = create_writing_thread(
+            fifo_path.clone(),
+            "a1,a2\n".to_owned(),
+            lines,
+            waiting.clone(),
+            joinable_lines_length,
+        );
+
+        // Data Schema
+        let schema = Arc::new(Schema::new(vec![
             Field::new("a1", DataType::Utf8, false),
             Field::new("a2", DataType::UInt32, false),
         ]));
+        // Create a file with bounded or unbounded flag.
         ctx.register_csv(
             "left",
             fifo_path.as_os_str().to_str().unwrap(),
             CsvReadOptions::new()
-                .schema(left_schema.as_ref())
-                .has_header(false)
-                .mark_infinite(with_unbounded_execution),
+                .schema(schema.as_ref())
+                .mark_infinite(unbounded_file),
         )
         .await?;
         // Register right table
@@ -118,119 +156,76 @@ mod unix_test {
             CsvReadOptions::new().schema(schema.as_ref()),
         )
         .await?;
-        Ok(ctx)
+        // Execute the query
+        let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?;
+        let mut stream = df.execute_stream().await?;
+        while (stream.next().await).is_some() {
+            waiting.store(false, Ordering::SeqCst);
+        }
+        task.join().unwrap();
+        Ok(())
     }
 
     #[derive(Debug, PartialEq)]
-    enum Operation {
-        Read,
-        Write,
-    }
-
-    /// Checks if there is a [Operation::Read] between [Operation::Write]s.
-    /// This indicates we did not wait for the file to finish before processing it.
-    fn interleave(result: &[Operation]) -> bool {
-        let first_read = result.iter().position(|op| op == &Operation::Read);
-        let last_write = result.iter().rev().position(|op| op == &Operation::Write);
-        match (first_read, last_write) {
-            (Some(first_read), Some(last_write)) => {
-                result.len() - 1 - last_write > first_read
-            }
-            (_, _) => false,
-        }
+    enum JoinOperation {
+        LeftUnmatched,
+        RightUnmatched,
+        Equal,
     }
 
-    // This test provides a relatively realistic end-to-end scenario where
-    // we swap join sides to accommodate a FIFO source.
-    #[rstest]
-    #[timeout(std::time::Duration::from_secs(30))]
-    #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
-    async fn unbounded_file_with_swapped_join(
-        #[values(true, false)] unbounded_file: bool,
-    ) -> Result<()> {
-        // To make unbounded deterministic
-        let waiting = Arc::new(Mutex::new(unbounded_file));
-        let waiting_thread = waiting.clone();
-        // Create a channel
-        let (tx, rx): (Sender<Operation>, Receiver<Operation>) = mpsc::channel();
-        // Create a new temporary FIFO file
-        let tmp_dir = TempDir::new()?;
-        let fifo_path = create_fifo_file(&tmp_dir, "first_fifo.csv")?;
-        // Prevent move
-        let fifo_path_thread = fifo_path.clone();
+    fn create_writing_thread(
+        file_path: PathBuf,
+        header: String,
+        lines: Vec<String>,
+        waiting_lock: Arc<AtomicBool>,
+        wait_until: usize,
+    ) -> JoinHandle<()> {
         // Timeout for a long period of BrokenPipe error
-        let broken_pipe_timeout = Duration::from_secs(5);
-        // The sender endpoint can be copied
-        let thread_tx = tx.clone();
+        let broken_pipe_timeout = Duration::from_secs(10);
         // Spawn a new thread to write to the FIFO file
-        let fifo_writer = thread::spawn(move || {
-            let first_file = OpenOptions::new()
-                .write(true)
-                .open(fifo_path_thread)
-                .unwrap();
+        thread::spawn(move || {
+            let file = OpenOptions::new().write(true).open(file_path).unwrap();
             // Reference time to use when deciding to fail the test
             let execution_start = Instant::now();
-            // Execution can calculated at least one RecordBatch after the number of
-            // "joinable_lines_length" lines are read.
-            let joinable_lines_length =
-                (TEST_DATA_SIZE as f64 * TEST_JOIN_RATIO).round() as usize;
-            // The row including "a" is joinable with aggregate_test_100.c1
-            let joinable_iterator = (0..joinable_lines_length).map(|_| "a".to_string());
-            let second_joinable_iterator =
-                (0..joinable_lines_length).map(|_| "a".to_string());
-            // The row including "zzz" is not joinable with aggregate_test_100.c1
-            let non_joinable_iterator =
-                (0..(TEST_DATA_SIZE - joinable_lines_length)).map(|_| "zzz".to_string());
-            let string_array = joinable_iterator
-                .chain(non_joinable_iterator)
-                .chain(second_joinable_iterator);
-            for (cnt, string_col) in enumerate(string_array) {
-                // Wait a reading sign for unbounded execution
-                // For unbounded execution:
-                //  After joinable_lines_length FIFO reading, we MUST get a Operation::Read.
-                // For bounded execution:
-                //  Never goes into while loop since waiting_thread initiated as false.
-                while *waiting_thread.lock().unwrap() && joinable_lines_length < cnt {
-                    thread::sleep(Duration::from_millis(200));
-                }
-                // Each thread queues a message in the channel
-                if cnt % TEST_BATCH_SIZE == 0 {
-                    thread_tx.send(Operation::Write).unwrap();
+            write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
+            for (cnt, line) in enumerate(lines) {
+                while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+                    thread::sleep(Duration::from_millis(50));
                 }
-                let line = format!("{string_col},{cnt}\n").to_owned();
-                write_to_fifo(&first_file, &line, execution_start, broken_pipe_timeout)
+                write_to_fifo(&file, &line, execution_start, broken_pipe_timeout)
                     .unwrap();
             }
-        });
-        // Collects operations from both writer and executor.
-        let result_collector = thread::spawn(move || {
-            let mut results = vec![];
-            while let Ok(res) = rx.recv() {
-                results.push(res);
-            }
-            results
-        });
-        // Create an execution case with bounded or unbounded flag.
-        let ctx = create_ctx(&fifo_path, unbounded_file).await?;
-        // Execute the query
-        let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?;
-        let mut stream = df.execute_stream().await?;
-        while (stream.next().await).is_some() {
-            *waiting.lock().unwrap() = false;
-            tx.send(Operation::Read).unwrap();
-        }
-        fifo_writer.join().unwrap();
-        drop(tx);
-        let result = result_collector.join().unwrap();
-        assert_eq!(interleave(&result), unbounded_file);
-        Ok(())
+            drop(file);
+        })
     }
 
-    #[derive(Debug, PartialEq)]
-    enum JoinOperation {
-        LeftUnmatched,
-        RightUnmatched,
-        Equal,
+    /// This function creates an unbounded sorted file for testing purposes.
+    pub async fn register_unbounded_file_with_ordering(
+        ctx: &SessionContext,
+        schema: arrow::datatypes::SchemaRef,
+        file_path: &Path,
+        table_name: &str,
+        file_sort_order: Option<Vec<Expr>>,
+        with_unbounded_execution: bool,
+    ) -> Result<()> {
+        // Mark infinite and provide schema:
+        let fifo_options = CsvReadOptions::new()
+            .schema(schema.as_ref())
+            .mark_infinite(with_unbounded_execution);
+        // Get listing options:
+        let options_sort = fifo_options
+            .to_listing_options(&ctx.copied_config())
+            .with_file_sort_order(file_sort_order);
+        // Register table:
+        ctx.register_listing_table(
+            table_name,
+            file_path.as_os_str().to_str().unwrap(),
+            options_sort,
+            Some(schema),
+            None,
+        )
+        .await?;
+        Ok(())
     }
 
     // This test provides a relatively realistic end-to-end scenario where
@@ -239,84 +234,122 @@ mod unix_test {
     #[rstest]
     #[timeout(std::time::Duration::from_secs(30))]
     #[tokio::test(flavor = "multi_thread")]
+    #[ignore]
     async fn unbounded_file_with_symmetric_join() -> Result<()> {
-        // To make unbounded deterministic
-        let waiting = Arc::new(Mutex::new(true));
-        let thread_bools = vec![waiting.clone(), waiting.clone()];
-        // Create a new temporary FIFO file
-        let tmp_dir = TempDir::new()?;
-        let file_names = vec!["first_fifo.csv", "second_fifo.csv"];
-        // The sender endpoint can be copied
-        let (threads, file_paths): (Vec<JoinHandle<()>>, Vec<PathBuf>) = file_names
-            .iter()
-            .zip(thread_bools.iter())
-            .map(|(file_name, lock)| {
-                let waiting_thread = lock.clone();
-                let fifo_path = create_fifo_file(&tmp_dir, file_name).unwrap();
-                let return_path = fifo_path.clone();
-                // Timeout for a long period of BrokenPipe error
-                let broken_pipe_timeout = Duration::from_secs(5);
-                // Spawn a new thread to write to the FIFO file
-                let fifo_writer = thread::spawn(move || {
-                    let mut rng = StdRng::seed_from_u64(42);
-                    let file = OpenOptions::new()
-                        .write(true)
-                        .open(fifo_path.clone())
-                        .unwrap();
-                    // Reference time to use when deciding to fail the test
-                    let execution_start = Instant::now();
-                    // Join filter
-                    let a1_iter = (0..TEST_DATA_SIZE).map(|x| {
-                        if rng.gen_range(0.0..1.0) < 0.3 {
-                            x - 1
-                        } else {
-                            x
-                        }
-                    });
-                    // Join key
-                    let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
-                    for (cnt, (a1, a2)) in a1_iter.zip(a2_iter).enumerate() {
-                        // Wait a reading sign for unbounded execution
-                        // After first batch FIFO reading, we will wait for a batch created.
-                        while *waiting_thread.lock().unwrap() && TEST_BATCH_SIZE + 1 < cnt
-                        {
-                            thread::sleep(Duration::from_millis(200));
-                        }
-                        let line = format!("{a1},{a2}\n").to_owned();
-                        write_to_fifo(&file, &line, execution_start, broken_pipe_timeout)
-                            .unwrap();
-                    }
-                });
-                (fifo_writer, return_path)
-            })
-            .unzip();
+        // Create session context
         let config = SessionConfig::new()
             .with_batch_size(TEST_BATCH_SIZE)
             .set_bool("datafusion.execution.coalesce_batches", false)
             .with_target_partitions(1);
         let ctx = SessionContext::with_config(config);
-        test_create_unbounded_sorted_file(&ctx, file_paths[0].clone(), "left").await?;
-        test_create_unbounded_sorted_file(&ctx, file_paths[1].clone(), "right").await?;
-        // Execute the query
-        let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
+        // Tasks
+        let mut tasks: Vec<JoinHandle<()>> = vec![];
+
+        // Join filter
+        let a1_iter = 0..TEST_DATA_SIZE;
+        // Join key
+        let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
+        let lines = a1_iter
+            .zip(a2_iter)
+            .map(|(a1, a2)| format!("{a1},{a2}\n"))
+            .collect::<Vec<_>>();
+
+        // Create a new temporary FIFO file
+        let tmp_dir = TempDir::new()?;
+        // Create a FIFO file for the left input source.
+        let left_fifo = create_fifo_file(&tmp_dir, "left.csv")?;
+        // Create a FIFO file for the right input source.
+        let right_fifo = create_fifo_file(&tmp_dir, "right.csv")?;
+        // Create a mutex for tracking if the right input source is waiting for data.
+        let waiting = Arc::new(AtomicBool::new(true));
+
+        // Create writing threads for the left and right FIFO files
+        tasks.push(create_writing_thread(
+            left_fifo.clone(),
+            "a1,a2\n".to_owned(),
+            lines.clone(),
+            waiting.clone(),
+            TEST_BATCH_SIZE,
+        ));
+        tasks.push(create_writing_thread(
+            right_fifo.clone(),
+            "a1,a2\n".to_owned(),
+            lines.clone(),
+            waiting.clone(),
+            TEST_BATCH_SIZE,
+        ));
+
+        // Create schema
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::UInt32, false),
+            Field::new("a2", DataType::UInt32, false),
+        ]));
+        // Specify the ordering:
+        let file_sort_order = Some(
+            [datafusion_expr::col("a1")]
+                .into_iter()
+                .map(|e| {
+                    let ascending = true;
+                    let nulls_first = false;
+                    e.sort(ascending, nulls_first)
+                })
+                .collect::<Vec<_>>(),
+        );
+        // Set unbounded sorted files read configuration
+        register_unbounded_file_with_ordering(
+            &ctx,
+            schema.clone(),
+            &left_fifo,
+            "left",
+            file_sort_order.clone(),
+            true,
+        )
+        .await?;
+        register_unbounded_file_with_ordering(
+            &ctx,
+            schema,
+            &right_fifo,
+            "right",
+            file_sort_order,
+            true,
+        )
+        .await?;
+        // Execute the query, with no matching rows. (since key is modulus 10)
+        let df = ctx
+            .sql(
+                "SELECT
+                                      t1.a1,
+                                      t1.a2,
+                                      t2.a1,
+                                      t2.a2
+                                    FROM
+                                      left as t1 FULL
+                                      JOIN right as t2 ON t1.a2 = t2.a2
+                                      AND t1.a1 > t2.a1 + 4
+                                      AND t1.a1 < t2.a1 + 9",
+            )
+            .await?;
         let mut stream = df.execute_stream().await?;
         let mut operations = vec![];
+        // Partial.
         while let Some(Ok(batch)) = stream.next().await {
-            *waiting.lock().unwrap() = false;
-            let op = if batch.column(0).null_count() > 0 {
-                JoinOperation::LeftUnmatched
-            } else if batch.column(2).null_count() > 0 {
+            waiting.store(false, Ordering::SeqCst);
+            let left_unmatched = batch.column(2).null_count();
+            let right_unmatched = batch.column(0).null_count();
+            let op = if left_unmatched == 0 && right_unmatched == 0 {
+                JoinOperation::Equal
+            } else if right_unmatched > left_unmatched {
                 JoinOperation::RightUnmatched
             } else {
-                JoinOperation::Equal
+                JoinOperation::LeftUnmatched
             };
             operations.push(op);
         }
-
+        tasks.into_iter().for_each(|jh| jh.join().unwrap());
         // The SymmetricHashJoin executor produces FULL join results at every
         // pruning, which happens before it reaches the end of input and more
         // than once. In this test, we feed partially joinable data to both
-        // sides in order to ensure that both left/right unmatched results are
+        // sides in order to ensure that left or right unmatched results are
         // generated more than once during the test.
         assert!(
             operations
@@ -330,7 +363,6 @@ mod unix_test {
                     .count()
                     > 1
         );
-        threads.into_iter().for_each(|j| j.join().unwrap());
         Ok(())
     }
 }
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 25e4195fba..3adf5585d7 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -149,6 +149,7 @@ datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
+datafusion.optimizer.allow_symmetric_joins_without_pruning true
 datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs
index dbaaba304b..8f0094fadf 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -186,6 +186,12 @@ impl SessionConfig {
         self
     }
 
+    /// Enables or disables the allowing unordered symmetric hash join
+    pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
+        self.options.optimizer.allow_symmetric_joins_without_pruning = enabled;
+        self
+    }
+
     /// Enables or disables the use of repartitioning for file scans
     pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
         self.options.optimizer.repartition_file_scans = enabled;
diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs
index 66367001c6..a54bcb2f35 100644
--- a/datafusion/physical-expr/src/intervals/cp_solver.rs
+++ b/datafusion/physical-expr/src/intervals/cp_solver.rs
@@ -29,8 +29,10 @@ use petgraph::stable_graph::{DefaultIx, StableGraph};
 use petgraph::visit::{Bfs, Dfs, DfsPostOrder, EdgeRef};
 use petgraph::Outgoing;
 
-use crate::expressions::Literal;
-use crate::intervals::interval_aritmetic::{apply_operator, Interval};
+use crate::expressions::{BinaryExpr, CastExpr, Column, Literal};
+use crate::intervals::interval_aritmetic::{
+    apply_operator, is_operator_supported, Interval,
+};
 use crate::utils::{build_dag, ExprTreeNode};
 use crate::PhysicalExpr;
 
@@ -521,6 +523,22 @@ impl ExprIntervalGraph {
     }
 }
 
+/// Indicates whether interval arithmetic is supported for the given expression.
+/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations.
+/// We do not support every type of [`Operator`]s either. Over time, this check
+/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
+/// Currently, [`CastExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
+pub fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
+    let expr_any = expr.as_any();
+    let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
+    {
+        is_operator_supported(binary_expr.op())
+    } else {
+        expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
+    };
+    expr_supported && expr.children().iter().all(check_support)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/intervals/mod.rs b/datafusion/physical-expr/src/intervals/mod.rs
index 45616534cb..9883ba15b2 100644
--- a/datafusion/physical-expr/src/intervals/mod.rs
+++ b/datafusion/physical-expr/src/intervals/mod.rs
@@ -22,5 +22,5 @@ pub mod cp_solver;
 pub mod interval_aritmetic;
 
 pub mod test_utils;
-pub use cp_solver::ExprIntervalGraph;
+pub use cp_solver::{check_support, ExprIntervalGraph};
 pub use interval_aritmetic::*;
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 1e457ee9b6..749a0bcb06 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,40 +35,41 @@ Values are parsed according to the [same rules used in casts from Utf8](https://
 If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted.
 Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions.
 
-| key                                                       | default    | description                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
-| --------------------------------------------------------- | ---------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| datafusion.catalog.create_default_catalog_and_schema      | true       | Whether the default catalog and schema should be created automatically.                                                                                                                                                                                                                                                                                                                                                           [...]
-| datafusion.catalog.default_catalog                        | datafusion | The default catalog name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                                                                                                                                     [...]
-| datafusion.catalog.default_schema                         | public     | The default schema name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                                                                                                                                      [...]
-| datafusion.catalog.information_schema                     | false      | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                                                                                                                                         [...]
-| datafusion.catalog.location                               | NULL       | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                                                                                                                                              [...]
-| datafusion.catalog.format                                 | NULL       | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                                                                                                                                                      [...]
-| datafusion.catalog.has_header                             | false      | If the file has a header                                                                                                                                                                                                                                                                                                                                                                                                          [...]
-| datafusion.execution.batch_size                           | 8192       | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption                                                                                                                                                                                                                                               [...]
-| datafusion.execution.coalesce_batches                     | true       | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting                                                                                                                        [...]
-| datafusion.execution.collect_statistics                   | false      | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                                                                                                                                          [...]
-| datafusion.execution.target_partitions                    | 0          | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system                                                                                                                                                                                                                                                                                       [...]
-| datafusion.execution.time_zone                            | +00:00     | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour                                                                                                                                                                                                                                                                   [...]
-| datafusion.execution.parquet.enable_page_index            | false      | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                                                                                      [...]
-| datafusion.execution.parquet.pruning                      | true       | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file                                                                                                                                                                                                                                                                   [...]
-| datafusion.execution.parquet.skip_metadata                | true       | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata                                                                                                                                                                                 [...]
-| datafusion.execution.parquet.metadata_size_hint           | NULL       | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer                                                                                                                                                           [...]
-| datafusion.execution.parquet.pushdown_filters             | false      | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded                                                                                                                                                                                                                                                                                                             [...]
-| datafusion.execution.parquet.reorder_filters              | false      | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query                                                                                                                                                                                                       [...]
-| datafusion.optimizer.enable_round_robin_repartition       | true       | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores                                                                                                                                                                                                                                                                                       [...]
-| datafusion.optimizer.filter_null_join_keys                | false      | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                                                                                                   [...]
-| datafusion.optimizer.repartition_aggregations             | true       | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                        [...]
-| datafusion.optimizer.repartition_file_min_size            | 10485760   | Minimum total files size in bytes to perform file scan repartitioning.                                                                                                                                                                                                                                                                                                                                                            [...]
-| datafusion.optimizer.repartition_joins                    | true       | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                                  [...]
-| datafusion.optimizer.repartition_file_scans               | true       | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format in which case multiple row groups from the same file may be read concurrently. If false then each row group is read serially, though different files may be read in parallel.                                                                                                                     [...]
-| datafusion.optimizer.repartition_windows                  | true       | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                 [...]
-| datafusion.optimizer.repartition_sorts                    | true       | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", would turn into the plan below which performs better in multithreaded environments "SortPreservingMergeExec: [a@0 [...]
-| datafusion.optimizer.skip_failed_rules                    | true       | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail                                                                                                                                                                                              [...]
-| datafusion.optimizer.max_passes                           | 3          | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                                                                                                              [...]
-| datafusion.optimizer.top_down_join_key_reordering         | true       | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                                                                                                                                                [...]
-| datafusion.optimizer.prefer_hash_join                     | true       | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                                                                                                                                             [...]
-| datafusion.optimizer.hash_join_single_partition_threshold | 1048576    | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                                                                                    [...]
-| datafusion.explain.logical_plan_only                      | false      | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                                                                                                                                             [...]
-| datafusion.explain.physical_plan_only                     | false      | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                                                                                                                                            [...]
-| datafusion.sql_parser.parse_float_as_decimal              | false      | When set to true, SQL parser will parse float as decimal type                                                                                                                                                                                                                                                                                                                                                                     [...]
-| datafusion.sql_parser.enable_ident_normalization          | true       | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)                                                                                                                                                                                                                                                                                                                                    [...]
+| key                                                        | default    | description                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+| ---------------------------------------------------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| datafusion.catalog.create_default_catalog_and_schema       | true       | Whether the default catalog and schema should be created automatically.                                                                                                                                                                                                                                                                                                                                                          [...]
+| datafusion.catalog.default_catalog                         | datafusion | The default catalog name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                                                                                                                                    [...]
+| datafusion.catalog.default_schema                          | public     | The default schema name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                                                                                                                                     [...]
+| datafusion.catalog.information_schema                      | false      | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                                                                                                                                        [...]
+| datafusion.catalog.location                                | NULL       | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                                                                                                                                             [...]
+| datafusion.catalog.format                                  | NULL       | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                                                                                                                                                     [...]
+| datafusion.catalog.has_header                              | false      | If the file has a header                                                                                                                                                                                                                                                                                                                                                                                                         [...]
+| datafusion.execution.batch_size                            | 8192       | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption                                                                                                                                                                                                                                              [...]
+| datafusion.execution.coalesce_batches                      | true       | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting                                                                                                                       [...]
+| datafusion.execution.collect_statistics                    | false      | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                                                                                                                                         [...]
+| datafusion.execution.target_partitions                     | 0          | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system                                                                                                                                                                                                                                                                                      [...]
+| datafusion.execution.time_zone                             | +00:00     | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour                                                                                                                                                                                                                                                                  [...]
+| datafusion.execution.parquet.enable_page_index             | false      | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                                                                                     [...]
+| datafusion.execution.parquet.pruning                       | true       | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file                                                                                                                                                                                                                                                                  [...]
+| datafusion.execution.parquet.skip_metadata                 | true       | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata                                                                                                                                                                                [...]
+| datafusion.execution.parquet.metadata_size_hint            | NULL       | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer                                                                                                                                                          [...]
+| datafusion.execution.parquet.pushdown_filters              | false      | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded                                                                                                                                                                                                                                                                                                            [...]
+| datafusion.execution.parquet.reorder_filters               | false      | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query                                                                                                                                                                                                      [...]
+| datafusion.optimizer.enable_round_robin_repartition        | true       | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores                                                                                                                                                                                                                                                                                      [...]
+| datafusion.optimizer.filter_null_join_keys                 | false      | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                                                                                                  [...]
+| datafusion.optimizer.repartition_aggregations              | true       | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                       [...]
+| datafusion.optimizer.repartition_file_min_size             | 10485760   | Minimum total files size in bytes to perform file scan repartitioning.                                                                                                                                                                                                                                                                                                                                                           [...]
+| datafusion.optimizer.repartition_joins                     | true       | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                                 [...]
+| datafusion.optimizer.allow_symmetric_joins_without_pruning | true       | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in  [...]
+| datafusion.optimizer.repartition_file_scans                | true       | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format in which case multiple row groups from the same file may be read concurrently. If false then each row group is read serially, though different files may be read in parallel.                                                                                                                    [...]
+| datafusion.optimizer.repartition_windows                   | true       | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level                                                                                                                                                                                                                                                                                [...]
+| datafusion.optimizer.repartition_sorts                     | true       | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", would turn into the plan below which performs better in multithreaded environments "SortPreservingMergeExec: [a@ [...]
+| datafusion.optimizer.skip_failed_rules                     | true       | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail                                                                                                                                                                                             [...]
+| datafusion.optimizer.max_passes                            | 3          | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                                                                                                             [...]
+| datafusion.optimizer.top_down_join_key_reordering          | true       | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                                                                                                                                               [...]
+| datafusion.optimizer.prefer_hash_join                      | true       | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                                                                                                                                            [...]
+| datafusion.optimizer.hash_join_single_partition_threshold  | 1048576    | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                                                                                   [...]
+| datafusion.explain.logical_plan_only                       | false      | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                                                                                                                                            [...]
+| datafusion.explain.physical_plan_only                      | false      | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                                                                                                                                           [...]
+| datafusion.sql_parser.parse_float_as_decimal               | false      | When set to true, SQL parser will parse float as decimal type                                                                                                                                                                                                                                                                                                                                                                    [...]
+| datafusion.sql_parser.enable_ident_normalization           | true       | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)                                                                                                                                                                                                                                                                                                                                   [...]