You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by oz...@apache.org on 2023/12/19 15:22:40 UTC

(arrow-datafusion) branch main updated: Increase test coverage for unbounded and bounded cases (#8581)

This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bc61b31ae Increase test coverage for unbounded and bounded cases (#8581)
9bc61b31ae is described below

commit 9bc61b31ae4f67c55c03214c9b807079e4fe0f44
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Dec 19 18:22:33 2023 +0300

    Increase test coverage for unbounded and bounded cases (#8581)
    
    * Re-introduce unbounded tests with new executor
    
    * Remove unnecessary test
    
    * Enhance test coverage
    
    * Review
    
    * Test passes
    
    * Change argument order
    
    * Parametrize enforce sorting test
    
    * Imports
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../core/src/physical_optimizer/enforce_sorting.rs |  92 ++-
 .../replace_with_order_preserving_variants.rs      | 714 ++++++++++++++++++---
 datafusion/core/src/test/mod.rs                    |  28 +-
 3 files changed, 697 insertions(+), 137 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2b650a4269..2ecc1e11b9 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -60,8 +60,8 @@ use crate::physical_plan::{
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_common::{plan_err, DataFusionError};
 use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
-
 use datafusion_physical_plan::repartition::RepartitionExec;
+
 use itertools::izip;
 
 /// This rule inspects [`SortExec`]'s in the given physical plan and removes the
@@ -769,7 +769,7 @@ mod tests {
     use crate::physical_plan::repartition::RepartitionExec;
     use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::{SessionConfig, SessionContext};
-    use crate::test::{csv_exec_sorted, stream_exec_ordered};
+    use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};
 
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -777,6 +777,8 @@ mod tests {
     use datafusion_expr::JoinType;
     use datafusion_physical_expr::expressions::{col, Column, NotExpr};
 
+    use rstest::rstest;
+
     fn create_test_schema() -> Result<SchemaRef> {
         let nullable_column = Field::new("nullable_col", DataType::Int32, true);
         let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
@@ -2140,12 +2142,19 @@ mod tests {
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_lost_ordering_unbounded() -> Result<()> {
+    async fn test_with_lost_ordering_unbounded_bounded(
+        #[values(false, true)] source_unbounded: bool,
+    ) -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        // create an unbounded source
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        // create either bounded or unbounded source
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_ordered(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2154,50 +2163,71 @@ mod tests {
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
         let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = vec![
             "SortExec: expr=[a@0 ASC]",
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = vec![
+            "SortExec: expr=[a@0 ASC]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = vec![
             "SortPreservingMergeExec: [a@0 ASC]",
             "  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "      StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
-        Ok(())
-    }
 
-    #[tokio::test]
-    async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
-        let schema = create_test_schema3()?;
-        let sort_exprs = vec![sort_expr("a", &schema)];
-        // create an unbounded source
-        let source = stream_exec_ordered(&schema, sort_exprs);
-        let repartition_rr = repartition_exec(source);
-        let repartition_hash = Arc::new(RepartitionExec::try_new(
-            repartition_rr,
-            Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
-        )?) as _;
-        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
-        let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);
-
-        let expected_input = ["SortExec: expr=[a@0 ASC]",
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = vec![
+            "SortExec: expr=[a@0 ASC]",
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "        StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
         ];
-        let expected_optimized = [
+        let expected_optimized_bounded_parallelize_sort = vec![
             "SortPreservingMergeExec: [a@0 ASC]",
-            "  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "      StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, false);
+        let (expected_input, expected_optimized, expected_optimized_sort_parallelize) =
+            if source_unbounded {
+                (
+                    expected_input_unbounded,
+                    expected_optimized_unbounded.clone(),
+                    expected_optimized_unbounded,
+                )
+            } else {
+                (
+                    expected_input_bounded,
+                    expected_optimized_bounded,
+                    expected_optimized_bounded_parallelize_sort,
+                )
+            };
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan.clone(),
+            false
+        );
+        assert_optimized!(
+            expected_input,
+            expected_optimized_sort_parallelize,
+            physical_plan,
+            true
+        );
         Ok(())
     }
 
diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 671891be43..0ff7e9f48e 100644
--- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -276,6 +276,9 @@ pub(crate) fn replace_with_order_preserving_variants(
 mod tests {
     use super::*;
 
+    use crate::datasource::file_format::file_compression_type::FileCompressionType;
+    use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
     use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::filter::FilterExec;
@@ -285,35 +288,95 @@ mod tests {
     use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::SessionConfig;
-
     use crate::test::TestStreamPartition;
+
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::tree_node::TreeNode;
-    use datafusion_common::Result;
+    use datafusion_common::{Result, Statistics};
+    use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_expr::{JoinType, Operator};
     use datafusion_physical_expr::expressions::{self, col, Column};
     use datafusion_physical_expr::PhysicalSortExpr;
     use datafusion_physical_plan::streaming::StreamingTableExec;
+
     use rstest::rstest;
 
-    /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan
-    /// against the original and expected plans.
+    /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+    /// the plan against the original and expected plans for both bounded and
+    /// unbounded cases.
     ///
-    /// `$EXPECTED_PLAN_LINES`: input plan
-    /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
-    /// `$PLAN`: the plan to optimized
-    /// `$ALLOW_BOUNDED`: whether to allow the plan to be optimized for bounded cases
-    macro_rules! assert_optimized {
-        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
+    /// # Parameters
+    ///
+    /// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan.
+    /// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan.
+    /// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is
+    ///   the same regardless of the value of the `prefer_existing_sort` flag.
+    /// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
+    ///   `prefer_existing_sort` is `false` for bounded cases.
+    /// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan
+    ///   when the flag `prefer_existing_sort` is `true` for bounded cases.
+    /// * `$PLAN`: The plan to optimize.
+    /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source.
+    macro_rules! assert_optimized_in_all_boundedness_situations {
+        ($EXPECTED_UNBOUNDED_PLAN_LINES: expr,  $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => {
+            if $SOURCE_UNBOUNDED {
+                assert_optimized_prefer_sort_on_off!(
+                    $EXPECTED_UNBOUNDED_PLAN_LINES,
+                    $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
+                    $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
+                    $PLAN
+                );
+            } else {
+                assert_optimized_prefer_sort_on_off!(
+                    $EXPECTED_BOUNDED_PLAN_LINES,
+                    $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
+                    $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+                    $PLAN
+                );
+            }
+        };
+    }
+
+    /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+    /// the plan against the original and expected plans.
+    ///
+    /// # Parameters
+    ///
+    /// * `$EXPECTED_PLAN_LINES`: Expected input plan.
+    /// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
+    ///   `prefer_existing_sort` is `false`.
+    /// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when
+    ///   the flag `prefer_existing_sort` is `true`.
+    /// * `$PLAN`: The plan to optimize.
+    macro_rules! assert_optimized_prefer_sort_on_off {
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
             assert_optimized!(
                 $EXPECTED_PLAN_LINES,
                 $EXPECTED_OPTIMIZED_PLAN_LINES,
-                $PLAN,
+                $PLAN.clone(),
                 false
             );
+            assert_optimized!(
+                $EXPECTED_PLAN_LINES,
+                $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+                $PLAN,
+                true
+            );
         };
-        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $ALLOW_BOUNDED: expr) => {
+    }
+
+    /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+    /// the plan against the original and expected plans.
+    ///
+    /// # Parameters
+    ///
+    /// * `$EXPECTED_PLAN_LINES`: Expected input plan.
+    /// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan.
+    /// * `$PLAN`: The plan to optimize.
+    /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag.
+    macro_rules! assert_optimized {
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => {
             let physical_plan = $PLAN;
             let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
             let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -329,8 +392,7 @@ mod tests {
             let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
 
             // Run the rule top-down
-            // let optimized_physical_plan = physical_plan.transform_down(&replace_repartition_execs)?;
-            let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
+            let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT);
             let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan);
             let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?;
             let optimized_physical_plan = parallel.plan;
@@ -348,35 +410,67 @@ mod tests {
     #[tokio::test]
     // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected
     async fn test_replace_multiple_input_repartition_1(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
 
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "      StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -384,11 +478,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_with_inter_children_change_only(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr_default("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +506,8 @@ mod tests {
             sort2,
         );
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  SortExec: expr=[a@0 ASC]",
             "    FilterExec: c@1 > 3",
@@ -420,8 +519,21 @@ mod tests {
             "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "                  StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          SortExec: expr=[a@0 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+        ];
 
-        let expected_optimized = [
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  FilterExec: c@1 > 3",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
@@ -431,11 +543,38 @@ mod tests {
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "              StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          SortExec: expr=[a@0 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  FilterExec: c@1 > 3",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        SortPreservingMergeExec: [a@0 ASC]",
+            "          RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -443,11 +582,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_2(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let filter = filter_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(filter);
@@ -456,7 +599,8 @@ mod tests {
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
@@ -464,18 +608,48 @@ mod tests {
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "          StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded =  [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded =  [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded =  [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -483,11 +657,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_with_extra_steps(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -497,7 +675,8 @@ mod tests {
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    CoalesceBatchesExec: target_batch_size=8192",
@@ -506,7 +685,18 @@ mod tests {
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "            StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@1 > 3",
@@ -514,11 +704,33 @@ mod tests {
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "          StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -526,11 +738,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_with_extra_steps_2(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -542,7 +758,8 @@ mod tests {
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    CoalesceBatchesExec: target_batch_size=8192",
@@ -552,7 +769,19 @@ mod tests {
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "              StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@1 > 3",
@@ -561,11 +790,35 @@ mod tests {
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "            StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -573,11 +826,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_not_replacing_when_no_need_to_preserve_sorting(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -586,7 +843,8 @@ mod tests {
         let physical_plan: Arc<dyn ExecutionPlan> =
             coalesce_partitions_exec(coalesce_batches_exec);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "CoalescePartitionsExec",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@1 > 3",
@@ -594,7 +852,17 @@ mod tests {
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "          StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "CoalescePartitionsExec",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "CoalescePartitionsExec",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@1 > 3",
@@ -602,11 +870,26 @@ mod tests {
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "          StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results same with and without flag, because there is no executor  with ordering requirement
+        let expected_optimized_bounded = [
+            "CoalescePartitionsExec",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
+
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -614,11 +897,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_with_multiple_replacable_repartitions(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -629,7 +916,8 @@ mod tests {
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
@@ -639,7 +927,19 @@ mod tests {
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "              StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      CoalesceBatchesExec: target_batch_size=8192",
+            "        FilterExec: c@1 > 3",
+            "          RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    CoalesceBatchesExec: target_batch_size=8192",
@@ -648,11 +948,35 @@ mod tests {
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "            StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      CoalesceBatchesExec: target_batch_size=8192",
+            "        FilterExec: c@1 > 3",
+            "          RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -660,11 +984,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_not_replace_with_different_orderings(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let sort = sort_exec(
@@ -678,25 +1006,49 @@ mod tests {
             sort,
         );
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results same with and without flag, because ordering requirement of the executor is different than the existing ordering.
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
+
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -704,35 +1056,67 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_with_lost_ordering(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
         let physical_plan =
             sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false);
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortExec: expr=[a@0 ASC NULLS LAST]",
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "        StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        let expected_optimized = [
+        let expected_input_bounded = [
+            "SortExec: expr=[a@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "      StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortExec: expr=[a@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -740,11 +1124,15 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_with_lost_and_kept_ordering(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = stream_exec_ordered(&schema, sort_exprs);
+        let source = if source_unbounded {
+            stream_exec_ordered(&schema, sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, sort_exprs)
+        };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -764,7 +1152,8 @@ mod tests {
             sort2,
         );
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  SortExec: expr=[c@1 ASC]",
             "    FilterExec: c@1 > 3",
@@ -776,8 +1165,21 @@ mod tests {
             "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "                  StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          SortExec: expr=[c@1 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
 
-        let expected_optimized = [
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  FilterExec: c@1 > 3",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
@@ -788,11 +1190,39 @@ mod tests {
             "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "                StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results with and without flag
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "          SortExec: expr=[c@1 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = [
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  FilterExec: c@1 > 3",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        SortExec: expr=[c@1 ASC]",
+            "          CoalescePartitionsExec",
+            "            RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "                CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -800,19 +1230,27 @@ mod tests {
     #[rstest]
     #[tokio::test]
     async fn test_with_multiple_child_trees(
-        #[values(false, true)] prefer_existing_sort: bool,
+        #[values(false, true)] source_unbounded: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
 
         let left_sort_exprs = vec![sort_expr("a", &schema)];
-        let left_source = stream_exec_ordered(&schema, left_sort_exprs);
+        let left_source = if source_unbounded {
+            stream_exec_ordered(&schema, left_sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, left_sort_exprs)
+        };
         let left_repartition_rr = repartition_exec_round_robin(left_source);
         let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
         let left_coalesce_partitions =
             Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
 
         let right_sort_exprs = vec![sort_expr("a", &schema)];
-        let right_source = stream_exec_ordered(&schema, right_sort_exprs);
+        let right_source = if source_unbounded {
+            stream_exec_ordered(&schema, right_sort_exprs)
+        } else {
+            csv_exec_sorted(&schema, right_sort_exprs)
+        };
         let right_repartition_rr = repartition_exec_round_robin(right_source);
         let right_repartition_hash = repartition_exec_hash(right_repartition_rr);
         let right_coalesce_partitions =
@@ -831,7 +1269,8 @@ mod tests {
             sort,
         );
 
-        let expected_input = [
+        // Expected inputs unbounded and bounded
+        let expected_input_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  SortExec: expr=[a@0 ASC]",
             "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
@@ -844,8 +1283,22 @@ mod tests {
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "            StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
+        let expected_input_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
 
-        let expected_optimized = [
+        // Expected unbounded result (same for with and without flag)
+        let expected_optimized_unbounded = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  SortExec: expr=[a@0 ASC]",
             "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
@@ -858,11 +1311,32 @@ mod tests {
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "            StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(
-            expected_input,
-            expected_optimized,
+
+        // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. Hence no need to preserve
+        // existing ordering.
+        let expected_optimized_bounded = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
+
+        assert_optimized_in_all_boundedness_situations!(
+            expected_input_unbounded,
+            expected_input_bounded,
+            expected_optimized_unbounded,
+            expected_optimized_bounded,
+            expected_optimized_bounded_sort_preserve,
             physical_plan,
-            prefer_existing_sort
+            source_unbounded
         );
         Ok(())
     }
@@ -985,8 +1459,7 @@ mod tests {
         Ok(schema)
     }
 
-    // creates a csv exec source for the test purposes
-    // projection and has_header parameters are given static due to testing needs
+    // creates a stream exec source for the test purposes
     fn stream_exec_ordered(
         schema: &SchemaRef,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
@@ -1007,4 +1480,35 @@ mod tests {
             .unwrap(),
         )
     }
+
+    // creates a csv exec source for the test purposes
+    // projection and has_header parameters are given static due to testing needs
+    fn csv_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        let projection: Vec<usize> = vec![0, 2, 3];
+
+        Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new(
+                    "file_path".to_string(),
+                    100,
+                )]],
+                statistics: Statistics::new_unknown(schema),
+                projection: Some(projection),
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: vec![sort_exprs],
+            },
+            true,
+            0,
+            b'"',
+            None,
+            FileCompressionType::UNCOMPRESSED,
+        ))
+    }
 }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 7a63466a39..ed5aa15e29 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -43,13 +43,13 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{DataFusionError, FileType, Statistics};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
 
 #[cfg(feature = "compression")]
 use bzip2::write::BzEncoder;
 #[cfg(feature = "compression")]
 use bzip2::Compression as BzCompression;
-use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
 #[cfg(feature = "compression")]
 use flate2::write::GzEncoder;
 #[cfg(feature = "compression")]
@@ -334,6 +334,32 @@ pub fn stream_exec_ordered(
     )
 }
 
+/// Create a csv exec for tests
+pub fn csv_exec_ordered(
+    schema: &SchemaRef,
+    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+) -> Arc<dyn ExecutionPlan> {
+    let sort_exprs = sort_exprs.into_iter().collect();
+
+    Arc::new(CsvExec::new(
+        FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema: schema.clone(),
+            file_groups: vec![vec![PartitionedFile::new("file_path".to_string(), 100)]],
+            statistics: Statistics::new_unknown(schema),
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            output_ordering: vec![sort_exprs],
+        },
+        true,
+        0,
+        b'"',
+        None,
+        FileCompressionType::UNCOMPRESSED,
+    ))
+}
+
 /// A mock execution plan that simply returns the provided statistics
 #[derive(Debug, Clone)]
 pub struct StatisticsExec {