You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/30 17:54:17 UTC

[arrow-datafusion] branch main updated: Support Defining Ordering Equivalence at the Source (#6469)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c7bfe15b49 Support Defining Ordering Equivalence at the Source (#6469)
c7bfe15b49 is described below

commit c7bfe15b4940ebff39f466212ccf32e891db7243
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Tue May 30 20:54:10 2023 +0300

    Support Defining Ordering Equivalence at the Source (#6469)
    
    * Convert ordering equivalence to vec (unit test)
    
    * compiles
    
    * Simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * simplifications
    
    * Add test cases
    
    * fix bug
    
    * simplifications
    
    * Resolve linter errors
    
    * remove unnecessary codes
    
    * simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * Add pruning to ordering_equivalence projection
    
    * Remove unnecessary clones
    
    * Convert get range to calculate compatible ranges
    
    * Multiple ordering is supported, plan display will be fixed
    
    * Simplifications
    
    * Update comments
    
    * Minor changes
    
    * Reduce code duplication for ordering_eq_prop function
    
    * Simplifications
    
    * simplifications
    
    * simplifications
    
    * simplifications
    
    * Simplifications
    
    * Comments/API change/Proto naming
    
    * Update comments
    
    * Use builder style for ordering equivalence creation
    
    * Adapt order_by_to_sort_expr() change
    
    * Move LexOrdering definition to common place
    
    * All Vec<OrderByExpr>'s are replaced with new type LexOrdering
    
    ---------
    
    Co-authored-by: Mustafa Akur <mu...@synnada.ai>
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion-cli/Cargo.lock                          |  30 ++--
 datafusion-examples/examples/csv_opener.rs         |   2 +-
 datafusion-examples/examples/json_opener.rs        |   2 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   2 +-
 .../core/src/datasource/file_format/options.rs     |   2 +-
 datafusion/core/src/datasource/listing/table.rs    |  72 ++++----
 .../core/src/datasource/listing_table_factory.rs   |   8 +-
 .../combine_partial_final_agg.rs                   |   2 +-
 .../src/physical_optimizer/dist_enforcement.rs     |   8 +-
 .../core/src/physical_optimizer/repartition.rs     |   8 +-
 .../src/physical_optimizer/sort_enforcement.rs     |   4 +-
 .../src/physical_plan/file_format/arrow_file.rs    |  20 ++-
 .../core/src/physical_plan/file_format/avro.rs     |  28 +++-
 .../core/src/physical_plan/file_format/csv.rs      |  23 ++-
 .../src/physical_plan/file_format/file_stream.rs   |   2 +-
 .../core/src/physical_plan/file_format/json.rs     |  31 ++--
 .../core/src/physical_plan/file_format/mod.rs      |  33 ++--
 .../core/src/physical_plan/file_format/parquet.rs  |  54 +++---
 .../src/physical_plan/joins/symmetric_hash_join.rs |  18 +-
 datafusion/core/src/physical_plan/mod.rs           |  39 ++++-
 datafusion/core/src/test/mod.rs                    |   4 +-
 datafusion/core/src/test_util/mod.rs               |   2 +-
 datafusion/core/src/test_util/parquet.rs           |   2 +-
 datafusion/core/tests/fifo.rs                      |  18 +-
 datafusion/core/tests/parquet/custom_reader.rs     |   2 +-
 datafusion/core/tests/parquet/page_pruning.rs      |   2 +-
 datafusion/core/tests/row.rs                       |   2 +-
 datafusion/core/tests/sql/order.rs                 |   5 +-
 datafusion/core/tests/sql/parquet.rs               |   6 +-
 .../test_files/create_external_table.slt           |   4 -
 .../tests/sqllogictests/test_files/groupby.slt     |  55 ++++++
 datafusion/expr/src/logical_plan/ddl.rs            |   2 +-
 datafusion/physical-expr/src/equivalence.rs        |  19 +++
 datafusion/physical-expr/src/lib.rs                |   2 +-
 datafusion/physical-expr/src/sort_expr.rs          |   3 +
 datafusion/proto/proto/datafusion.proto            |  14 +-
 datafusion/proto/src/generated/pbjson.rs           | 184 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  18 +-
 datafusion/proto/src/logical_plan/mod.rs           | 107 ++++++------
 datafusion/proto/src/physical_plan/from_proto.rs   |  43 +++--
 datafusion/proto/src/physical_plan/mod.rs          |   2 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |  32 ++--
 datafusion/sql/src/parser.rs                       |  28 ++--
 datafusion/sql/src/statement.rs                    |  42 ++---
 datafusion/substrait/src/physical_plan/consumer.rs |   2 +-
 .../substrait/tests/roundtrip_physical_plan.rs     |   2 +-
 46 files changed, 686 insertions(+), 304 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 57513528be..6cc9fa2011 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -302,7 +302,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -1370,7 +1370,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -2233,7 +2233,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -2292,9 +2292,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.58"
+version = "1.0.59"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8"
+checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b"
 dependencies = [
  "unicode-ident",
 ]
@@ -2311,9 +2311,9 @@ dependencies = [
 
 [[package]]
 name = "quote"
-version = "1.0.27"
+version = "1.0.28"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500"
+checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488"
 dependencies = [
  "proc-macro2",
 ]
@@ -2659,7 +2659,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -2838,9 +2838,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.16"
+version = "2.0.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01"
+checksum = "45b6ddbb36c5b969c182aec3c4a0bce7df3fbad4b77114706a49aacc80567388"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2892,7 +2892,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -2982,7 +2982,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -3080,7 +3080,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
 ]
 
 [[package]]
@@ -3244,7 +3244,7 @@ dependencies = [
  "once_cell",
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
  "wasm-bindgen-shared",
 ]
 
@@ -3278,7 +3278,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.16",
+ "syn 2.0.17",
  "wasm-bindgen-backend",
  "wasm-bindgen-shared",
 ]
diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
index 0143b0297e..351f95cd2c 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -64,7 +64,7 @@ async fn main() -> Result<()> {
         projection: Some(vec![12, 0]),
         limit: Some(5),
         table_partition_cols: vec![],
-        output_ordering: None,
+        output_ordering: vec![],
         infinite_source: false,
     };
 
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
index 843bed4f61..6c7d5ae3d7 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -68,7 +68,7 @@ async fn main() -> Result<()> {
         projection: Some(vec![1, 0]),
         limit: Some(5),
         table_partition_cols: vec![],
-        output_ordering: None,
+        output_ordering: vec![],
         infinite_source: false,
     };
 
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 28f798ade4..cffd6b9caf 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -141,7 +141,7 @@ pub(crate) mod test_util {
                     projection,
                     limit,
                     table_partition_cols: vec![],
-                    output_ordering: None,
+                    output_ordering: vec![],
                     infinite_source: false,
                 },
                 None,
diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs
index 4024da3313..3e802362d3 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -443,7 +443,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
             // TODO: Add file sort order into CsvReadOptions and introduce here.
-            .with_file_sort_order(None)
+            .with_file_sort_order(vec![])
             .with_infinite_source(self.infinite)
     }
 
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 8bb3818c91..bea3f837cb 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -27,7 +27,7 @@ use dashmap::DashMap;
 use datafusion_common::ToDFSchema;
 use datafusion_expr::expr::Sort;
 use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr};
+use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
 use futures::{future, stream, StreamExt, TryStreamExt};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -44,14 +44,12 @@ use crate::datasource::{
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::physical_plan;
+use crate::physical_plan::file_format::FileScanConfig;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
     logical_expr::Expr,
-    physical_plan::{
-        empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
-        Statistics,
-    },
+    physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan, Statistics},
 };
 
 use super::PartitionedFile;
@@ -222,7 +220,7 @@ pub struct ListingOptions {
     /// Group files to avoid that the number of partitions exceeds
     /// this limit
     pub target_partitions: usize,
-    /// Optional pre-known sort order. Must be `SortExpr`s.
+    /// Optional pre-known sort order(s). Must be `SortExpr`s.
     ///
     /// DataFusion may take advantage of this ordering to omit sorts
     /// or use more efficient algorithms. Currently sortedness must be
@@ -231,7 +229,12 @@ pub struct ListingOptions {
     /// parquet metadata.
     ///
     /// See <https://github.com/apache/arrow-datafusion/issues/4177>
-    pub file_sort_order: Option<Vec<Expr>>,
+    /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
+    ///       where each ordering consists of an individual lexicographic
+    ///       ordering (encapsulated by a `Vec<Expr>`). If there aren't
+    ///       multiple equivalent orderings, the outer `Vec` will have a
+    ///       single element.
+    pub file_sort_order: Vec<Vec<Expr>>,
     /// Infinite source means that the input is not guaranteed to end.
     /// Currently, CSV, JSON, and AVRO formats are supported.
     /// In order to support infinite inputs, DataFusion may adjust query
@@ -253,7 +256,7 @@ impl ListingOptions {
             table_partition_cols: vec![],
             collect_stat: true,
             target_partitions: 1,
-            file_sort_order: None,
+            file_sort_order: vec![],
             infinite_source: false,
         }
     }
@@ -407,9 +410,9 @@ impl ListingOptions {
     /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
     ///  // Tell datafusion that the files are sorted by column "a"
-    ///  let file_sort_order = Some(vec![
+    ///  let file_sort_order = vec![vec![
     ///    col("a").sort(true, true)
-    ///  ]);
+    ///  ]];
     ///
     /// let listing_options = ListingOptions::new(Arc::new(
     ///     ParquetFormat::default()
@@ -418,7 +421,7 @@ impl ListingOptions {
     ///
     /// assert_eq!(listing_options.file_sort_order, file_sort_order);
     /// ```
-    pub fn with_file_sort_order(mut self, file_sort_order: Option<Vec<Expr>>) -> Self {
+    pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
         self.file_sort_order = file_sort_order;
         self
     }
@@ -612,16 +615,12 @@ impl ListingTable {
     }
 
     /// If file_sort_order is specified, creates the appropriate physical expressions
-    fn try_create_output_ordering(&self) -> Result<Option<Vec<PhysicalSortExpr>>> {
-        let file_sort_order =
-            if let Some(file_sort_order) = self.options.file_sort_order.as_ref() {
-                file_sort_order
-            } else {
-                return Ok(None);
-            };
+    fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
+        let mut all_sort_orders = vec![];
 
-        // convert each expr to a physical sort expr
-        let sort_exprs = file_sort_order
+        for exprs in &self.options.file_sort_order {
+            // Construct PhsyicalSortExpr objects from Expr objects:
+            let sort_exprs = exprs
             .iter()
             .map(|expr| {
                 if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
@@ -637,7 +636,7 @@ impl ListingTable {
                     }
                     else {
                         Err(DataFusionError::Plan(
-                            format!("Only support single column references in output_ordering, got {expr:?}")
+                            format!("Expected single column references in output_ordering, got {expr:?}")
                         ))
                     }
                 } else {
@@ -647,8 +646,9 @@ impl ListingTable {
                 }
             })
             .collect::<Result<Vec<_>>>()?;
-
-        Ok(Some(sort_exprs))
+            all_sort_orders.push(sort_exprs);
+        }
+        Ok(all_sort_orders)
     }
 }
 
@@ -956,40 +956,38 @@ mod tests {
 
         // (file_sort_order, expected_result)
         let cases = vec![
-            (None, Ok(None)),
-            // empty list
-            (Some(vec![]), Ok(Some(vec![]))),
+            (vec![], Ok(vec![])),
             // not a sort expr
             (
-                Some(vec![col("string_col")]),
+                vec![vec![col("string_col")]],
                 Err("Expected Expr::Sort in output_ordering, but got string_col"),
             ),
             // sort expr, but non column
             (
-                Some(vec![
+                vec![vec![
                     col("int_col").add(lit(1)).sort(true, true),
-                ]),
-                Err("Only support single column references in output_ordering, got int_col + Int32(1)"),
+                ]],
+                Err("Expected single column references in output_ordering, got int_col + Int32(1)"),
             ),
             // ok with one column
             (
-                Some(vec![col("string_col").sort(true, false)]),
-                Ok(Some(vec![PhysicalSortExpr {
+                vec![vec![col("string_col").sort(true, false)]],
+                Ok(vec![vec![PhysicalSortExpr {
                     expr: physical_col("string_col", &schema).unwrap(),
                     options: SortOptions {
                         descending: false,
                         nulls_first: false,
                     },
-                }]))
+                }]])
 
             ),
             // ok with two columns, different options
             (
-                Some(vec![
+                vec![vec![
                     col("string_col").sort(true, false),
                     col("int_col").sort(false, true),
-                ]),
-                Ok(Some(vec![
+                ]],
+                Ok(vec![vec![
                     PhysicalSortExpr {
                         expr: physical_col("string_col", &schema).unwrap(),
                         options: SortOptions {
@@ -1004,7 +1002,7 @@ mod tests {
                             nulls_first: true,
                         },
                     },
-                ]))
+                ]])
 
             ),
 
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index dfc3d11732..7d10fc8e0e 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -128,12 +128,6 @@ impl TableProviderFactory for ListingTableFactory {
             (Some(schema), table_partition_cols)
         };
 
-        let file_sort_order = if cmd.order_exprs.is_empty() {
-            None
-        } else {
-            Some(cmd.order_exprs.clone())
-        };
-
         // look for 'infinite' as an option
         let infinite_source = cmd.unbounded;
 
@@ -143,7 +137,7 @@ impl TableProviderFactory for ListingTableFactory {
             .with_target_partitions(state.config().target_partitions())
             .with_table_partition_cols(table_partition_cols)
             .with_infinite_source(infinite_source)
-            .with_file_sort_order(file_sort_order);
+            .with_file_sort_order(cmd.order_exprs.clone());
 
         let table_path = ListingTableUrl::parse(&cmd.location)?;
         let resolved_schema = match provided_schema {
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 5657c62921..fdef7d54b8 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -261,7 +261,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 5c2a4d27f6..6c425b1740 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1000,11 +1000,11 @@ mod tests {
     }
 
     fn parquet_exec() -> Arc<ParquetExec> {
-        parquet_exec_with_sort(None)
+        parquet_exec_with_sort(vec![])
     }
 
     fn parquet_exec_with_sort(
-        output_ordering: Option<Vec<PhysicalSortExpr>>,
+        output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
             FileScanConfig {
@@ -1025,7 +1025,7 @@ mod tests {
 
     // Created a sorted parquet exec with multiple files
     fn parquet_exec_multiple_sorted(
-        output_ordering: Option<Vec<PhysicalSortExpr>>,
+        output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
             FileScanConfig {
@@ -2133,7 +2133,7 @@ mod tests {
         }];
 
         // Scan some sorted parquet files
-        let exec = parquet_exec_multiple_sorted(Some(sort_key.clone()));
+        let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
 
         // CoalesceBatchesExec to mimic behavior after a filter
         let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 8b407ed289..7db54eee51 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -356,7 +356,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -378,7 +378,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -402,7 +402,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: Some(sort_exprs),
+                output_ordering: vec![sort_exprs],
                 infinite_source: false,
             },
             None,
@@ -429,7 +429,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: Some(sort_exprs),
+                output_ordering: vec![sort_exprs],
                 infinite_source: false,
             },
             None,
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index f71c79e9fc..22f7d509c8 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -2805,7 +2805,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -2829,7 +2829,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: Some(sort_exprs),
+                output_ordering: vec![sort_exprs],
                 infinite_source: false,
             },
             None,
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
index d229031d37..72a6d0a0b4 100644
--- a/datafusion/core/src/physical_plan/file_format/arrow_file.rs
+++ b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
@@ -22,12 +22,15 @@ use crate::physical_plan::file_format::{
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+    ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
 };
 use arrow_schema::SchemaRef;
 use datafusion_common::Statistics;
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{
+    LexOrdering, OrderingEquivalenceProperties, PhysicalSortExpr,
+};
 use futures::StreamExt;
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
@@ -40,7 +43,7 @@ pub struct ArrowExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    projected_output_ordering: Vec<LexOrdering>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -83,7 +86,16 @@ impl ExecutionPlan for ArrowExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        ordering_equivalence_properties_helper(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 16dc146750..5b407d5498 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -17,14 +17,17 @@
 
 //! Execution plan for reading line-delimited Avro files
 use crate::error::Result;
+use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+    ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
+
 use arrow::datatypes::SchemaRef;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
 
-use crate::execution::context::TaskContext;
-use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use std::any::Any;
 use std::sync::Arc;
 
@@ -37,7 +40,7 @@ pub struct AvroExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    projected_output_ordering: Vec<LexOrdering>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -80,7 +83,16 @@ impl ExecutionPlan for AvroExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        ordering_equivalence_properties_helper(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -265,7 +277,7 @@ mod tests {
             projection: Some(vec![0, 1, 2]),
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -337,7 +349,7 @@ mod tests {
             projection,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -409,7 +421,7 @@ mod tests {
             statistics: Statistics::default(),
             limit: None,
             table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index f19cf0743a..1dde376315 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -20,6 +20,7 @@
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
@@ -27,15 +28,14 @@ use crate::physical_plan::file_format::file_stream::{
 use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+    ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
 
-use bytes::Buf;
-
-use crate::physical_plan::common::AbortOnDropSingle;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use futures::ready;
 use futures::{StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
@@ -54,7 +54,7 @@ pub struct CsvExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    projected_output_ordering: Vec<LexOrdering>,
     has_header: bool,
     delimiter: u8,
     /// Execution metrics
@@ -121,7 +121,16 @@ impl ExecutionPlan for CsvExec {
 
     /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        ordering_equivalence_properties_helper(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index d8d1b40726..2ca9a076cf 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -472,7 +472,7 @@ mod tests {
             projection: None,
             limit,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         };
         let metrics_set = ExecutionPlanMetricsSet::new();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 071c51b8c0..dcf23fdb25 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -19,6 +19,7 @@
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
@@ -26,14 +27,15 @@ use crate::physical_plan::file_format::file_stream::{
 use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+    ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
+
+use arrow::json::ReaderBuilder;
 use arrow::{datatypes::SchemaRef, json};
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
 
 use bytes::{Buf, Bytes};
-
-use crate::physical_plan::common::AbortOnDropSingle;
-use arrow::json::ReaderBuilder;
 use futures::{ready, stream, StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
@@ -52,7 +54,7 @@ pub struct NdJsonExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    projected_output_ordering: Vec<LexOrdering>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     file_compression_type: FileCompressionType,
@@ -101,7 +103,16 @@ impl ExecutionPlan for NdJsonExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        ordering_equivalence_properties_helper(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -427,7 +438,7 @@ mod tests {
                 projection: None,
                 limit: Some(3),
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             file_compression_type.to_owned(),
@@ -503,7 +514,7 @@ mod tests {
                 projection: None,
                 limit: Some(3),
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             file_compression_type.to_owned(),
@@ -549,7 +560,7 @@ mod tests {
                 projection: Some(vec![0, 2]),
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             file_compression_type.to_owned(),
@@ -600,7 +611,7 @@ mod tests {
                 projection: Some(vec![3, 0, 2]),
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             file_compression_type.to_owned(),
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index a05c0cf8c8..72dd262b05 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,7 +38,7 @@ use arrow::{
 };
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
 pub(crate) use json::plan_to_json;
 pub use json::{JsonOpener, NdJsonExec};
@@ -155,15 +155,15 @@ pub struct FileScanConfig {
     pub limit: Option<usize>,
     /// The partitioning columns
     pub table_partition_cols: Vec<(String, DataType)>,
-    /// The order in which the data is sorted, if known.
-    pub output_ordering: Option<Vec<PhysicalSortExpr>>,
+    /// All equivalent lexicographical orderings that describe the schema.
+    pub output_ordering: Vec<LexOrdering>,
     /// Indicates whether this plan may produce an infinite stream of records.
     pub infinite_source: bool,
 }
 
 impl FileScanConfig {
     /// Project the schema and the statistics on the given column indices
-    fn project(&self) -> (SchemaRef, Statistics, Option<Vec<PhysicalSortExpr>>) {
+    fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
         if self.projection.is_none() && self.table_partition_cols.is_empty() {
             return (
                 Arc::clone(&self.file_schema),
@@ -250,7 +250,7 @@ impl Debug for FileScanConfig {
 
 impl Display for FileScanConfig {
     fn fmt(&self, f: &mut Formatter) -> FmtResult {
-        let (schema, _, ordering) = self.project();
+        let (schema, _, orderings) = self.project();
 
         write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?;
 
@@ -266,11 +266,12 @@ impl Display for FileScanConfig {
             write!(f, ", infinite_source=true")?;
         }
 
-        if let Some(orders) = ordering {
-            if !orders.is_empty() {
-                write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?;
+        if let Some(ordering) = orderings.first() {
+            if !ordering.is_empty() {
+                write!(f, ", output_ordering={}", OutputOrderingDisplay(ordering))?;
             }
         }
+
         Ok(())
     }
 }
@@ -847,14 +848,15 @@ impl From<ObjectMeta> for FileMeta {
 fn get_projected_output_ordering(
     base_config: &FileScanConfig,
     projected_schema: &SchemaRef,
-) -> Option<Vec<PhysicalSortExpr>> {
-    let mut new_ordering = vec![];
-    if let Some(output_ordering) = &base_config.output_ordering {
+) -> Vec<Vec<PhysicalSortExpr>> {
+    let mut all_orderings = vec![];
+    for output_ordering in &base_config.output_ordering {
         if base_config.file_groups.iter().any(|group| group.len() > 1) {
             debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}",
-                   output_ordering, base_config.file_groups);
-            return None;
+            base_config.output_ordering[0], base_config.file_groups);
+            return vec![];
         }
+        let mut new_ordering = vec![];
         for PhysicalSortExpr { expr, options } in output_ordering {
             if let Some(col) = expr.as_any().downcast_ref::<Column>() {
                 let name = col.name();
@@ -871,8 +873,9 @@ fn get_projected_output_ordering(
             // since rest of the orderings are violated
             break;
         }
+        all_orderings.push(new_ordering);
     }
-    (!new_ordering.is_empty()).then_some(new_ordering)
+    all_orderings
 }
 
 #[cfg(test)]
@@ -1330,7 +1333,7 @@ mod tests {
             projection,
             statistics,
             table_partition_cols,
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         }
     }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 9843ecc22e..3b622423f2 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,8 +17,6 @@
 
 //! Execution plan for reading Parquet files
 
-use arrow::datatypes::{DataType, SchemaRef};
-use datafusion_physical_expr::PhysicalExpr;
 use fmt::Debug;
 use std::any::Any;
 use std::cmp::min;
@@ -27,25 +25,32 @@ use std::fs;
 use std::ops::Range;
 use std::sync::Arc;
 
-use crate::config::ConfigOptions;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
-use crate::physical_plan::file_format::FileMeta;
+use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
 use crate::{
+    config::ConfigOptions,
     datasource::listing::FileRange,
     error::{DataFusionError, Result},
     execution::context::TaskContext,
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
+        common::AbortOnDropSingle,
         expressions::PhysicalSortExpr,
-        file_format::{FileScanConfig, SchemaAdapter},
+        file_format::{FileMeta, FileScanConfig, SchemaAdapter},
         metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
+        ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+        Partitioning, SendableRecordBatchStream, Statistics,
     },
 };
+
+use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::ArrowError;
+use datafusion_physical_expr::{
+    LexOrdering, OrderingEquivalenceProperties, PhysicalExpr,
+};
+
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::{StreamExt, TryStreamExt};
@@ -64,8 +69,6 @@ mod page_filter;
 mod row_filter;
 mod row_groups;
 
-use crate::physical_plan::common::AbortOnDropSingle;
-use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
 pub use metrics::ParquetFileMetrics;
 
 #[derive(Default)]
@@ -90,7 +93,7 @@ pub struct ParquetExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    projected_output_ordering: Vec<LexOrdering>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for row filtering during parquet scan
@@ -342,7 +345,16 @@ impl ExecutionPlan for ParquetExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        ordering_equivalence_properties_helper(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn with_new_children(
@@ -900,7 +912,7 @@ mod tests {
                     projection,
                     limit: None,
                     table_partition_cols: vec![],
-                    output_ordering: None,
+                    output_ordering: vec![],
                     infinite_source: false,
                 },
                 predicate,
@@ -1549,7 +1561,7 @@ mod tests {
                     projection: None,
                     limit: None,
                     table_partition_cols: vec![],
-                    output_ordering: None,
+                    output_ordering: vec![],
                     infinite_source: false,
                 },
                 None,
@@ -1663,7 +1675,7 @@ mod tests {
                         ),
                     ),
                 ],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -1725,7 +1737,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -1917,7 +1929,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -1954,7 +1966,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -1996,7 +2008,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -2034,7 +2046,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -2073,7 +2085,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
@@ -2102,7 +2114,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 5602ed4cf3..3bd177e03b 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -2415,16 +2415,14 @@ mod tests {
             Field::new("a2", DataType::UInt32, false),
         ]));
         // Specify the ordering:
-        let file_sort_order = Some(
-            [datafusion_expr::col("a1")]
-                .into_iter()
-                .map(|e| {
-                    let ascending = true;
-                    let nulls_first = false;
-                    e.sort(ascending, nulls_first)
-                })
-                .collect::<Vec<_>>(),
-        );
+        let file_sort_order = vec![[datafusion_expr::col("a1")]
+            .into_iter()
+            .map(|e| {
+                let ascending = true;
+                let nulls_first = false;
+                e.sort(ascending, nulls_first)
+            })
+            .collect::<Vec<_>>()];
         register_unbounded_file_with_ordering(
             &ctx,
             schema.clone(),
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index a9881ad435..2c3a3e9b49 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -576,6 +576,43 @@ impl PartialEq for Partitioning {
     }
 }
 
+/// Retrieves the ordering equivalence properties for a given schema and output ordering.
+pub fn ordering_equivalence_properties_helper(
+    schema: SchemaRef,
+    eq_orderings: &[LexOrdering],
+) -> OrderingEquivalenceProperties {
+    let mut oep = OrderingEquivalenceProperties::new(schema);
+    let first_ordering = if let Some(first) = eq_orderings.first() {
+        first
+    } else {
+        // Return an empty OrderingEquivalenceProperties:
+        return oep;
+    };
+    let first_column = first_ordering
+        .iter()
+        .map(|e| TryFrom::try_from(e.clone()))
+        .collect::<Result<Vec<_>>>();
+    let checked_column_first = if let Ok(first) = first_column {
+        first
+    } else {
+        // Return an empty OrderingEquivalenceProperties:
+        return oep;
+    };
+    // First entry among eq_orderings is the head, skip it:
+    for ordering in eq_orderings.iter().skip(1) {
+        let column = ordering
+            .iter()
+            .map(|e| TryFrom::try_from(e.clone()))
+            .collect::<Result<Vec<_>>>();
+        if let Ok(column) = column {
+            if !column.is_empty() {
+                oep.add_equal_conditions((&checked_column_first, &column))
+            }
+        }
+    }
+    oep
+}
+
 /// Distribution schemes
 #[derive(Debug, Clone)]
 pub enum Distribution {
@@ -606,7 +643,7 @@ impl Distribution {
 use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{
-    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
+    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, LexOrdering,
 };
 pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
 use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index f99071a6f1..6e4d038b4a 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -195,7 +195,7 @@ pub fn partitioned_csv_config(
         projection: None,
         limit: None,
         table_partition_cols: vec![],
-        output_ordering: None,
+        output_ordering: vec![],
         infinite_source: false,
     })
 }
@@ -344,7 +344,7 @@ pub fn csv_exec_sorted(
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: Some(sort_exprs),
+            output_ordering: vec![sort_exprs],
             infinite_source,
         },
         false,
diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index d42379b82a..993ca9c186 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -518,7 +518,7 @@ pub async fn register_unbounded_file_with_ordering(
     schema: SchemaRef,
     file_path: &Path,
     table_name: &str,
-    file_sort_order: Option<Vec<Expr>>,
+    file_sort_order: Vec<Vec<Expr>>,
     with_unbounded_execution: bool,
 ) -> Result<()> {
     // Mark infinite and provide schema:
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index 14215cc9a0..17766142bd 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -150,7 +150,7 @@ impl TestParquetFile {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         };
 
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index f5a62ddee4..e972d39f4a 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -254,16 +254,14 @@ mod unix_test {
             Field::new("a2", DataType::UInt32, false),
         ]));
         // Specify the ordering:
-        let file_sort_order = Some(
-            [datafusion_expr::col("a1")]
-                .into_iter()
-                .map(|e| {
-                    let ascending = true;
-                    let nulls_first = false;
-                    e.sort(ascending, nulls_first)
-                })
-                .collect::<Vec<_>>(),
-        );
+        let file_sort_order = vec![[datafusion_expr::col("a1")]
+            .into_iter()
+            .map(|e| {
+                let ascending = true;
+                let nulls_first = false;
+                e.sort(ascending, nulls_first)
+            })
+            .collect::<Vec<_>>()];
         // Set unbounded sorted files read configuration
         register_unbounded_file_with_ordering(
             &ctx,
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index 62ca6ae859..501623dbd9 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -82,7 +82,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         },
         None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 1d444326bb..8be7aeb0ca 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -76,7 +76,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         },
         Some(predicate),
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 55310bb611..f5cb0e4804 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -87,7 +87,7 @@ async fn get_exec(
                 projection: projection.cloned(),
                 limit,
                 table_partition_cols: vec![],
-                output_ordering: None,
+                output_ordering: vec![],
                 infinite_source: false,
             },
             None,
diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs
index fa6ac61244..07254c8044 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -56,10 +56,7 @@ async fn create_external_table_with_order() -> Result<()> {
     let table_dyn = listing_table_factory.create(&ctx.state(), &cmd).await?;
     let table = table_dyn.as_any().downcast_ref::<ListingTable>().unwrap();
     assert_eq!(cmd.order_exprs.len(), 1);
-    assert_eq!(
-        &cmd.order_exprs,
-        table.options().file_sort_order.as_ref().unwrap()
-    );
+    assert_eq!(cmd.order_exprs, table.options().file_sort_order);
     Ok(())
 }
 
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 532a0414f9..0d780c6694 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -58,9 +58,7 @@ async fn parquet_with_sort_order_specified() {
     let session_config = SessionConfig::new().with_target_partitions(2);
 
     // The sort order is not specified
-    let options_no_sort = parquet_read_options
-        .to_listing_options(&session_config)
-        .with_file_sort_order(None);
+    let options_no_sort = parquet_read_options.to_listing_options(&session_config);
 
     // The sort order is specified (not actually correct in this case)
     let file_sort_order = [col("string_col"), col("int_col")]
@@ -74,7 +72,7 @@ async fn parquet_with_sort_order_specified() {
 
     let options_sort = parquet_read_options
         .to_listing_options(&session_config)
-        .with_file_sort_order(Some(file_sort_order));
+        .with_file_sort_order(vec![file_sort_order]);
 
     // This string appears in ParquetExec if the output ordering is
     // specified
diff --git a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
index ce6fdfeee0..c08d5a55c3 100644
--- a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
@@ -72,10 +72,6 @@ CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv' LOCATION 'bar.csv'
 statement error DataFusion error: SQL error: ParserError\("WITH HEADER ROW specified more than once"\)
 CREATE EXTERNAL TABLE t STORED AS CSV WITH HEADER ROW WITH HEADER ROW LOCATION 'foo.csv'
 
-# Duplicate `WITH ORDER` clause
-statement error DataFusion error: SQL error: ParserError\("WITH ORDER specified more than once"\)
-CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1) WITH ORDER (c1) LOCATION 'foo.csv'
-
 # Duplicate `DELIMITER` clause
 statement error DataFusion error: SQL error: ParserError\("DELIMITER specified more than once"\)
 CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV DELIMITER '-' DELIMITER '+' LOCATION 'foo.csv'
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 8bac60bbba..08523648dc 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1940,6 +1940,61 @@ WITH HEADER ROW
 WITH ORDER (a ASC, b ASC, c ASC)
 LOCATION 'tests/data/window_2.csv';
 
+# Create a table with 2 ordered columns.
+# In the next step, we will expect to observe the removed sort execs.
+statement ok
+CREATE EXTERNAL TABLE multiple_ordered_table (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# Expected a sort exec for b DESC
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY b DESC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.b DESC NULLS FIRST
+----TableScan: multiple_ordered_table projection=[a, b]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--SortExec: expr=[b@1 DESC]
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
+
+# Final plan shouldn't have SortExec c ASC,
+# because table already satisfies this ordering.
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY c ASC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.c ASC NULLS LAST
+----TableScan: multiple_ordered_table projection=[a, c]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+# Final plan shouldn't have SortExec a ASC, b ASC,
+# because table already satisfies this ordering.
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY a ASC, b ASC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
+----TableScan: multiple_ordered_table projection=[a, b]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
+
 # test_window_agg_sort
 statement ok
 set datafusion.execution.target_partitions = 1;
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index 2b02295b79..e005f11471 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -189,7 +189,7 @@ pub struct CreateExternalTable {
     /// SQL used to create the table, if available
     pub definition: Option<String>,
     /// Order expressions supplied by user
-    pub order_exprs: Vec<Expr>,
+    pub order_exprs: Vec<Vec<Expr>>,
     /// File compression type (GZIP, BZIP2, XZ, ZSTD)
     pub file_compression_type: CompressionTypeVariant,
     /// Whether the table is an infinite streams
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 6af72efc5a..83f26a1d06 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -22,6 +22,7 @@ use crate::{
 
 use arrow::datatypes::SchemaRef;
 use arrow_schema::SortOptions;
+use datafusion_common::DataFusionError;
 
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
@@ -221,6 +222,24 @@ impl From<OrderedColumn> for PhysicalSortExpr {
     }
 }
 
+impl TryFrom<PhysicalSortExpr> for OrderedColumn {
+    type Error = DataFusionError;
+
+    fn try_from(value: PhysicalSortExpr) -> Result<Self, Self::Error> {
+        if let Some(col) = value.expr.as_any().downcast_ref::<Column>() {
+            Ok(OrderedColumn {
+                col: col.clone(),
+                options: value.options,
+            })
+        } else {
+            Err(DataFusionError::NotImplemented(
+                "Only Column PhysicalSortExpr's can be downcasted to OrderedColumn yet"
+                    .to_string(),
+            ))
+        }
+    }
+}
+
 impl From<OrderedColumn> for PhysicalSortRequirement {
     fn from(value: OrderedColumn) -> Self {
         PhysicalSortRequirement {
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 5dd8192def..b54bcda601 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,7 +56,7 @@ pub use equivalence::{
 pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
+pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement};
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
     normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index 78cdd87db2..665a47e586 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -213,3 +213,6 @@ fn to_str(options: &SortOptions) -> &str {
         (false, false) => "ASC NULLS LAST",
     }
 }
+
+/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
+pub type LexOrdering = Vec<PhysicalSortExpr>;
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 7c35452085..2f6367bc18 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -96,6 +96,10 @@ message ParquetFormat {
 
 message AvroFormat {}
 
+message LogicalExprNodeCollection {
+  repeated LogicalExprNode logical_expr_nodes = 1;
+}
+
 message ListingTableScanNode {
   reserved 1; // was string table_name
   OwnedTableReference table_name = 14;
@@ -112,7 +116,7 @@ message ListingTableScanNode {
     ParquetFormat parquet = 11;
     AvroFormat avro = 12;
   }
-  repeated LogicalExprNode file_sort_order = 13;
+  repeated LogicalExprNodeCollection file_sort_order = 13;
 }
 
 message ViewTableScanNode {
@@ -183,7 +187,7 @@ message CreateExternalTableNode {
   string delimiter = 8;
   string definition = 9;
   string file_compression_type = 10;
-  repeated LogicalExprNode order_exprs = 13;
+  repeated LogicalExprNodeCollection order_exprs = 13;
   bool unbounded = 14;
   map<string, string> options = 11;
 }
@@ -1188,6 +1192,10 @@ message ScanLimit {
   uint32 limit = 1;
 }
 
+message PhysicalSortExprNodeCollection {
+  repeated PhysicalSortExprNode physical_sort_expr_nodes = 1;
+}
+
 message FileScanExecConf {
   // Was repeated ConfigOption options = 10;
   reserved 10;
@@ -1199,7 +1207,7 @@ message FileScanExecConf {
   Statistics statistics = 6;
   repeated string table_partition_cols = 7;
   string object_store_url = 8;
-  repeated PhysicalSortExprNode output_ordering = 9;
+  repeated PhysicalSortExprNodeCollection output_ordering = 9;
 }
 
 message ParquetScanExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 6dbe25c2d8..0134c71e9a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -10847,6 +10847,98 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
         deserializer.deserialize_struct("datafusion.LogicalExprNode", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for LogicalExprNodeCollection {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.logical_expr_nodes.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.LogicalExprNodeCollection", len)?;
+        if !self.logical_expr_nodes.is_empty() {
+            struct_ser.serialize_field("logicalExprNodes", &self.logical_expr_nodes)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for LogicalExprNodeCollection {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "logical_expr_nodes",
+            "logicalExprNodes",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            LogicalExprNodes,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "logicalExprNodes" | "logical_expr_nodes" => Ok(GeneratedField::LogicalExprNodes),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = LogicalExprNodeCollection;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.LogicalExprNodeCollection")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<LogicalExprNodeCollection, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut logical_expr_nodes__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::LogicalExprNodes => {
+                            if logical_expr_nodes__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("logicalExprNodes"));
+                            }
+                            logical_expr_nodes__ = Some(map.next_value()?);
+                        }
+                    }
+                }
+                Ok(LogicalExprNodeCollection {
+                    logical_expr_nodes: logical_expr_nodes__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.LogicalExprNodeCollection", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for LogicalExtensionNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
@@ -15741,6 +15833,98 @@ impl<'de> serde::Deserialize<'de> for PhysicalSortExprNode {
         deserializer.deserialize_struct("datafusion.PhysicalSortExprNode", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for PhysicalSortExprNodeCollection {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.physical_sort_expr_nodes.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalSortExprNodeCollection", len)?;
+        if !self.physical_sort_expr_nodes.is_empty() {
+            struct_ser.serialize_field("physicalSortExprNodes", &self.physical_sort_expr_nodes)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalSortExprNodeCollection {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "physical_sort_expr_nodes",
+            "physicalSortExprNodes",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            PhysicalSortExprNodes,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "physicalSortExprNodes" | "physical_sort_expr_nodes" => Ok(GeneratedField::PhysicalSortExprNodes),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PhysicalSortExprNodeCollection;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.PhysicalSortExprNodeCollection")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<PhysicalSortExprNodeCollection, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut physical_sort_expr_nodes__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::PhysicalSortExprNodes => {
+                            if physical_sort_expr_nodes__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("physicalSortExprNodes"));
+                            }
+                            physical_sort_expr_nodes__ = Some(map.next_value()?);
+                        }
+                    }
+                }
+                Ok(PhysicalSortExprNodeCollection {
+                    physical_sort_expr_nodes: physical_sort_expr_nodes__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.PhysicalSortExprNodeCollection", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PhysicalTryCastNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 7e48db10f3..bc8987b1d0 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -129,6 +129,12 @@ pub struct ParquetFormat {}
 pub struct AvroFormat {}
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LogicalExprNodeCollection {
+    #[prost(message, repeated, tag = "1")]
+    pub logical_expr_nodes: ::prost::alloc::vec::Vec<LogicalExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ListingTableScanNode {
     #[prost(message, optional, tag = "14")]
     pub table_name: ::core::option::Option<OwnedTableReference>,
@@ -149,7 +155,7 @@ pub struct ListingTableScanNode {
     #[prost(uint32, tag = "9")]
     pub target_partitions: u32,
     #[prost(message, repeated, tag = "13")]
-    pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNode>,
+    pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
     #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12")]
     pub file_format_type: ::core::option::Option<
         listing_table_scan_node::FileFormatType,
@@ -292,7 +298,7 @@ pub struct CreateExternalTableNode {
     #[prost(string, tag = "10")]
     pub file_compression_type: ::prost::alloc::string::String,
     #[prost(message, repeated, tag = "13")]
-    pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+    pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
     #[prost(bool, tag = "14")]
     pub unbounded: bool,
     #[prost(map = "string, string", tag = "11")]
@@ -1688,6 +1694,12 @@ pub struct ScanLimit {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalSortExprNodeCollection {
+    #[prost(message, repeated, tag = "1")]
+    pub physical_sort_expr_nodes: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileScanExecConf {
     #[prost(message, repeated, tag = "1")]
     pub file_groups: ::prost::alloc::vec::Vec<FileGroup>,
@@ -1704,7 +1716,7 @@ pub struct FileScanExecConf {
     #[prost(string, tag = "8")]
     pub object_store_url: ::prost::alloc::string::String,
     #[prost(message, repeated, tag = "9")]
-    pub output_ordering: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
+    pub output_ordering: ::prost::alloc::vec::Vec<PhysicalSortExprNodeCollection>,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 47107affb3..d1102faee3 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -17,7 +17,7 @@
 
 use crate::common::{byte_to_string, proto_error, str_to_byte};
 use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
-use crate::protobuf::CustomTableScanNode;
+use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
 use crate::{
     convert_required,
     protobuf::{
@@ -325,19 +325,15 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .map(|expr| from_proto::parse_expr(expr, ctx))
                     .collect::<Result<Vec<_>, _>>()?;
 
-                let file_sort_order = scan
-                    .file_sort_order
-                    .iter()
-                    .map(|expr| from_proto::parse_expr(expr, ctx))
-                    .collect::<Result<Vec<_>, _>>()?;
-
-                // Protobuf doesn't distinguish between "not present"
-                // and empty
-                let file_sort_order = if file_sort_order.is_empty() {
-                    None
-                } else {
-                    Some(file_sort_order)
-                };
+                let mut all_sort_orders = vec![];
+                for order in &scan.file_sort_order {
+                    let file_sort_order = order
+                        .logical_expr_nodes
+                        .iter()
+                        .map(|expr| from_proto::parse_expr(expr, ctx))
+                        .collect::<Result<Vec<_>, _>>()?;
+                    all_sort_orders.push(file_sort_order)
+                }
 
                 let file_format: Arc<dyn FileFormat> =
                     match scan.file_format_type.as_ref().ok_or_else(|| {
@@ -384,7 +380,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     )
                     .with_collect_stat(scan.collect_stat)
                     .with_target_partitions(scan.target_partitions as usize)
-                    .with_file_sort_order(file_sort_order);
+                    .with_file_sort_order(all_sort_orders);
 
                 let config =
                     ListingTableConfig::new_with_multi_paths(table_paths.clone())
@@ -505,11 +501,15 @@ impl AsLogicalPlan for LogicalPlanNode {
                     )))?
                 }
 
-                let order_exprs = create_extern_table
-                    .order_exprs
-                    .iter()
-                    .map(|expr| from_proto::parse_expr(expr, ctx))
-                    .collect::<Result<Vec<Expr>, _>>()?;
+                let mut order_exprs = vec![];
+                for expr in &create_extern_table.order_exprs {
+                    let order_expr = expr
+                        .logical_expr_nodes
+                        .iter()
+                        .map(|expr| from_proto::parse_expr(expr, ctx))
+                        .collect::<Result<Vec<Expr>, _>>()?;
+                    order_exprs.push(order_expr)
+                }
 
                 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
                     schema: pb_schema.try_into()?,
@@ -849,15 +849,17 @@ impl AsLogicalPlan for LogicalPlanNode {
                     };
 
                     let options = listing_table.options();
-                    let file_sort_order =
-                        if let Some(file_sort_order) = &options.file_sort_order {
-                            file_sort_order
+
+                    let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
+                    for order in &options.file_sort_order {
+                        let expr_vec = LogicalExprNodeCollection {
+                            logical_expr_nodes: order
                                 .iter()
                                 .map(|expr| expr.try_into())
-                                .collect::<Result<Vec<protobuf::LogicalExprNode>, _>>()?
-                        } else {
-                            vec![]
+                                .collect::<Result<Vec<_>, to_proto::Error>>()?,
                         };
+                        exprs_vec.push(expr_vec);
+                    }
 
                     Ok(protobuf::LogicalPlanNode {
                         logical_plan_type: Some(LogicalPlanType::ListingScan(
@@ -880,7 +882,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                                 projection,
                                 filters,
                                 target_partitions: options.target_partitions as u32,
-                                file_sort_order,
+                                file_sort_order: exprs_vec,
                             },
                         )),
                     })
@@ -1188,28 +1190,39 @@ impl AsLogicalPlan for LogicalPlanNode {
                     unbounded,
                     options,
                 },
-            )) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
-                    protobuf::CreateExternalTableNode {
-                        name: Some(name.clone().into()),
-                        location: location.clone(),
-                        file_type: file_type.clone(),
-                        has_header: *has_header,
-                        schema: Some(df_schema.try_into()?),
-                        table_partition_cols: table_partition_cols.clone(),
-                        if_not_exists: *if_not_exists,
-                        delimiter: String::from(*delimiter),
-                        order_exprs: order_exprs
+            )) => {
+                let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
+                for order in order_exprs {
+                    let temp = LogicalExprNodeCollection {
+                        logical_expr_nodes: order
                             .iter()
                             .map(|expr| expr.try_into())
-                            .collect::<Result<Vec<_>, to_proto::Error>>()?,
-                        definition: definition.clone().unwrap_or_default(),
-                        file_compression_type: file_compression_type.to_string(),
-                        unbounded: *unbounded,
-                        options: options.clone(),
-                    },
-                )),
-            }),
+                            .collect::<Result<Vec<_>, to_proto::Error>>(
+                        )?,
+                    };
+                    converted_order_exprs.push(temp);
+                }
+
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
+                        protobuf::CreateExternalTableNode {
+                            name: Some(name.clone().into()),
+                            location: location.clone(),
+                            file_type: file_type.clone(),
+                            has_header: *has_header,
+                            schema: Some(df_schema.try_into()?),
+                            table_partition_cols: table_partition_cols.clone(),
+                            if_not_exists: *if_not_exists,
+                            delimiter: String::from(*delimiter),
+                            order_exprs: converted_order_exprs,
+                            definition: definition.clone().unwrap_or_default(),
+                            file_compression_type: file_compression_type.to_string(),
+                            unbounded: *unbounded,
+                            options: options.clone(),
+                        },
+                    )),
+                })
+            }
             LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
                 name,
                 input,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index f8f55242a8..8af4815181 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -430,29 +430,28 @@ pub fn parse_protobuf_file_scan_config(
         })
         .collect::<Result<Vec<(String, DataType)>>>()?;
 
-    let output_ordering = proto
-        .output_ordering
-        .iter()
-        .map(|o| {
-            let expr = o
-                .expr
-                .as_ref()
-                .map(|e| parse_physical_expr(e.as_ref(), registry, &schema))
-                .unwrap()?;
-            Ok(PhysicalSortExpr {
-                expr,
-                options: SortOptions {
-                    descending: !o.asc,
-                    nulls_first: o.nulls_first,
-                },
+    let mut output_ordering = vec![];
+    for node_collection in &proto.output_ordering {
+        let sort_expr = node_collection
+            .physical_sort_expr_nodes
+            .iter()
+            .map(|node| {
+                let expr = node
+                    .expr
+                    .as_ref()
+                    .map(|e| parse_physical_expr(e.as_ref(), registry, &schema))
+                    .unwrap()?;
+                Ok(PhysicalSortExpr {
+                    expr,
+                    options: SortOptions {
+                        descending: !node.asc,
+                        nulls_first: node.nulls_first,
+                    },
+                })
             })
-        })
-        .collect::<Result<Vec<PhysicalSortExpr>>>()?;
-    let output_ordering = if output_ordering.is_empty() {
-        None
-    } else {
-        Some(output_ordering)
-    };
+            .collect::<Result<Vec<PhysicalSortExpr>>>()?;
+        output_ordering.push(sort_expr);
+    }
 
     Ok(FileScanConfig {
         object_store_url,
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 4c360dc378..7385a4ac21 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1627,7 +1627,7 @@ mod roundtrip_tests {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         };
 
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index f2f65b89f0..766906d04a 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -43,7 +43,10 @@ use datafusion::physical_plan::expressions::{
 use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 
 use crate::protobuf;
-use crate::protobuf::{physical_aggregate_expr_node, PhysicalSortExprNode, ScalarValue};
+use crate::protobuf::{
+    physical_aggregate_expr_node, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
+    ScalarValue,
+};
 use datafusion::logical_expr::BuiltinScalarFunction;
 use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, GetIndexedFieldExpr};
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
@@ -481,21 +484,21 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
             .map(|p| p.as_slice().try_into())
             .collect::<Result<Vec<_>, _>>()?;
 
-        let output_ordering = if let Some(output_ordering) = &conf.output_ordering {
-            output_ordering
+        let mut output_orderings = vec![];
+        for order in &conf.output_ordering {
+            let expr_node_vec = order
                 .iter()
-                .map(|o| {
-                    let expr = o.expr.clone().try_into()?;
+                .map(|sort_expr| {
+                    let expr = sort_expr.expr.clone().try_into()?;
                     Ok(PhysicalSortExprNode {
                         expr: Some(Box::new(expr)),
-                        asc: !o.options.descending,
-                        nulls_first: o.options.nulls_first,
+                        asc: !sort_expr.options.descending,
+                        nulls_first: sort_expr.options.nulls_first,
                     })
                 })
-                .collect::<Result<Vec<PhysicalSortExprNode>>>()?
-        } else {
-            vec![]
-        };
+                .collect::<Result<Vec<PhysicalSortExprNode>>>()?;
+            output_orderings.push(expr_node_vec)
+        }
 
         Ok(protobuf::FileScanExecConf {
             file_groups,
@@ -515,7 +518,12 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
                 .map(|x| x.0.clone())
                 .collect::<Vec<_>>(),
             object_store_url: conf.object_store_url.to_string(),
-            output_ordering,
+            output_ordering: output_orderings
+                .into_iter()
+                .map(|e| PhysicalSortExprNodeCollection {
+                    physical_sort_expr_nodes: e,
+                })
+                .collect::<Vec<_>>(),
         })
     }
 }
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index a0e928910d..38dacf35be 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -116,6 +116,9 @@ impl fmt::Display for CopyToSource {
     }
 }
 
+/// This type defines a lexicographical ordering.
+pub(crate) type LexOrdering = Vec<OrderByExpr>;
+
 /// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
 ///
 /// Syntax:
@@ -158,7 +161,7 @@ pub struct CreateExternalTable {
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
     /// Ordered expressions
-    pub order_exprs: Vec<OrderByExpr>,
+    pub order_exprs: Vec<LexOrdering>,
     /// Option to not error if table already exists
     pub if_not_exists: bool,
     /// File compression type (GZIP, BZIP2, XZ)
@@ -585,7 +588,7 @@ impl<'a> DFParser<'a> {
             delimiter: Option<char>,
             file_compression_type: Option<CompressionTypeVariant>,
             table_partition_cols: Option<Vec<String>>,
-            order_exprs: Option<Vec<OrderByExpr>>,
+            order_exprs: Vec<LexOrdering>,
             options: Option<HashMap<String, String>>,
         }
         let mut builder = Builder::default();
@@ -621,8 +624,7 @@ impl<'a> DFParser<'a> {
                     }
                     Keyword::WITH => {
                         if self.parser.parse_keyword(Keyword::ORDER) {
-                            ensure_not_set(&builder.order_exprs, "WITH ORDER")?;
-                            builder.order_exprs = Some(self.parse_order_by_exprs()?);
+                            builder.order_exprs.push(self.parse_order_by_exprs()?);
                         } else {
                             self.parser.expect_keyword(Keyword::HEADER)?;
                             self.parser.expect_keyword(Keyword::ROW)?;
@@ -689,7 +691,7 @@ impl<'a> DFParser<'a> {
             delimiter: builder.delimiter.unwrap_or(','),
             location: builder.location.unwrap(),
             table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]),
-            order_exprs: builder.order_exprs.unwrap_or(vec![]),
+            order_exprs: builder.order_exprs,
             if_not_exists,
             file_compression_type: builder
                 .file_compression_type
@@ -1124,14 +1126,14 @@ mod tests {
                 delimiter: ',',
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
-                order_exprs: vec![OrderByExpr {
+                order_exprs: vec![vec![OrderByExpr {
                     expr: Identifier(Ident {
                         value: "c1".to_owned(),
                         quote_style: None,
                     }),
                     asc,
                     nulls_first,
-                }],
+                }]],
                 if_not_exists: false,
                 file_compression_type: UNCOMPRESSED,
                 unbounded: false,
@@ -1154,7 +1156,7 @@ mod tests {
             delimiter: ',',
             location: "foo.csv".into(),
             table_partition_cols: vec![],
-            order_exprs: vec![
+            order_exprs: vec![vec![
                 OrderByExpr {
                     expr: Identifier(Ident {
                         value: "c1".to_owned(),
@@ -1171,7 +1173,7 @@ mod tests {
                     asc: Some(false),
                     nulls_first: Some(true),
                 },
-            ],
+            ]],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
             unbounded: false,
@@ -1193,7 +1195,7 @@ mod tests {
             delimiter: ',',
             location: "foo.csv".into(),
             table_partition_cols: vec![],
-            order_exprs: vec![OrderByExpr {
+            order_exprs: vec![vec![OrderByExpr {
                 expr: Expr::BinaryOp {
                     left: Box::new(Identifier(Ident {
                         value: "c1".to_owned(),
@@ -1207,7 +1209,7 @@ mod tests {
                 },
                 asc: Some(true),
                 nulls_first: None,
-            }],
+            }]],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
             unbounded: false,
@@ -1238,7 +1240,7 @@ mod tests {
             delimiter: '*',
             location: "foo.parquet".into(),
             table_partition_cols: vec!["c1".into()],
-            order_exprs: vec![OrderByExpr {
+            order_exprs: vec![vec![OrderByExpr {
                 expr: Expr::BinaryOp {
                     left: Box::new(Identifier(Ident {
                         value: "c1".to_owned(),
@@ -1252,7 +1254,7 @@ mod tests {
                 },
                 asc: Some(true),
                 nulls_first: None,
-            }],
+            }]],
             if_not_exists: true,
             file_compression_type: CompressionTypeVariant::ZSTD,
             unbounded: true,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index dada4b147b..7f914c6b91 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::parser::{
-    CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
+    CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, LexOrdering,
     Statement as DFStatement,
 };
 use crate::planner::{
@@ -44,9 +44,9 @@ use datafusion_expr::{
 };
 use sqlparser::ast;
 use sqlparser::ast::{
-    Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
-    SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
-    TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
+    Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, Query, SchemaName,
+    SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableConstraint,
+    TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
 };
 
 use datafusion_expr::expr::Placeholder;
@@ -578,10 +578,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn build_order_by(
         &self,
-        order_exprs: Vec<OrderByExpr>,
+        order_exprs: Vec<LexOrdering>,
         schema: &DFSchemaRef,
         planner_context: &mut PlannerContext,
-    ) -> Result<Vec<datafusion_expr::Expr>> {
+    ) -> Result<Vec<Vec<datafusion_expr::Expr>>> {
         // Ask user to provide a schema if schema is empty.
         if !order_exprs.is_empty() && schema.fields().is_empty() {
             return Err(DataFusionError::Plan(
@@ -589,22 +589,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     .to_owned(),
             ));
         }
-        // Convert each OrderByExpr to a SortExpr:
-        let result = self.order_by_to_sort_expr(&order_exprs, schema, planner_context)?;
-        // Verify that columns of all SortExprs exist in the schema:
-        for expr in result.iter() {
-            for column in expr.to_columns()?.iter() {
-                if !schema.has_column(column) {
-                    // Return an error if any column is not in the schema:
-                    return Err(DataFusionError::Plan(format!(
-                        "Column {column} is not in schema"
-                    )));
+
+        let mut all_results = vec![];
+        for expr in order_exprs {
+            // Convert each OrderByExpr to a SortExpr:
+            let expr_vec = self.order_by_to_sort_expr(&expr, schema, planner_context)?;
+            // Verify that columns of all SortExprs exist in the schema:
+            for expr in expr_vec.iter() {
+                for column in expr.to_columns()?.iter() {
+                    if !schema.has_column(column) {
+                        // Return an error if any column is not in the schema:
+                        return Err(DataFusionError::Plan(format!(
+                            "Column {column} is not in schema"
+                        )));
+                    }
                 }
             }
+            // If all SortExprs are valid, return them as an expression vector
+            all_results.push(expr_vec)
         }
-
-        // If all SortExprs are valid, return them as an expression vector
-        Ok(result)
+        Ok(all_results)
     }
 
     /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs
index 0c65beb4e1..6273e0e589 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -112,7 +112,7 @@ pub async fn from_substrait_rel(
                         projection: None,
                         limit: None,
                         table_partition_cols: vec![],
-                        output_ordering: None,
+                        output_ordering: vec![],
                         infinite_source: false,
                     };
 
diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs
index d213ccf187..ab77f19ea0 100644
--- a/datafusion/substrait/tests/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs
@@ -48,7 +48,7 @@ mod tests {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
-            output_ordering: None,
+            output_ordering: vec![],
             infinite_source: false,
         };
         let parquet_exec: Arc<dyn ExecutionPlan> =