You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/14 17:12:05 UTC

[arrow-datafusion] branch master updated: Add ability to specify external sort information for ListingTables (#4170)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d2814c960 Add ability to specify external sort information for ListingTables (#4170)
d2814c960 is described below

commit d2814c960168b45c4a0f5d7bbb72d9f412cb08bd
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Nov 14 12:11:59 2022 -0500

    Add ability to specify external sort information for ListingTables (#4170)
    
    * Add ability to specify external sort information for ParquetExec
    
    * Updates
    
    * Allow any listing table to specify its output order
    
    * Start writing tests
    
    * more tests
    
    * Add doc comments on multple partitions and sort order
    
    * Add tests for multiple files in a partition
    
    * Ignore sort order on parquet exec if it has more than one file
    
    * Add for other listing tables
    
    * fmt
---
 benchmarks/src/bin/tpch.rs                         |   1 +
 .../examples/parquet_sql_multiple_files.rs         |   1 +
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 datafusion/core/src/datasource/listing/table.rs    | 166 ++++++++++++++++++++-
 .../core/src/datasource/listing_table_factory.rs   |   1 +
 datafusion/core/src/execution/context.rs           |   5 +-
 datafusion/core/src/execution/options.rs           |   4 +
 .../core/src/physical_optimizer/enforcement.rs     |   1 +
 .../core/src/physical_optimizer/repartition.rs     |   1 +
 .../core/src/physical_plan/file_format/avro.rs     |   7 +-
 .../core/src/physical_plan/file_format/csv.rs      |   5 +-
 .../src/physical_plan/file_format/file_stream.rs   |   1 +
 .../core/src/physical_plan/file_format/json.rs     |   7 +-
 .../core/src/physical_plan/file_format/mod.rs      |  81 +++++++++-
 .../core/src/physical_plan/file_format/parquet.rs  |  63 +++++---
 datafusion/core/src/test/mod.rs                    |   1 +
 datafusion/core/tests/custom_parquet_reader.rs     |   1 +
 datafusion/core/tests/parquet/page_pruning.rs      |   1 +
 datafusion/core/tests/row.rs                       |   1 +
 datafusion/core/tests/sql/parquet.rs               |  99 ++++++++++++
 datafusion/proto/proto/datafusion.proto            |   1 +
 datafusion/proto/src/generated/pbjson.rs           |  18 +++
 datafusion/proto/src/generated/prost.rs            |   2 +
 datafusion/proto/src/logical_plan.rs               |  43 ++++--
 parquet-test-utils/src/lib.rs                      |   1 +
 25 files changed, 473 insertions(+), 40 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 838709cf0..4517016af 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -397,6 +397,7 @@ async fn get_table(
         target_partitions,
         collect_stat: ctx.config.collect_statistics,
         table_partition_cols: vec![],
+        file_sort_order: None,
     };
 
     let table_path = ListingTableUrl::parse(path)?;
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 6004ce67d..6e8ef2397 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -39,6 +39,7 @@ async fn main() -> Result<()> {
         table_partition_cols: vec![],
         collect_stat: true,
         target_partitions: 1,
+        file_sort_order: None,
     };
 
     // Register a listing table - this will use all files in the directory as data sources
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 6775117e2..507e437a4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -124,6 +124,7 @@ pub(crate) mod test_util {
                     limit,
                     table_partition_cols: vec![],
                     config_options: ConfigOptions::new().into_shareable(),
+                    output_ordering: None,
                 },
                 &[],
             )
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 05f560f20..ab08d6f18 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,9 +20,11 @@
 use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
+use arrow::compute::SortOptions;
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use async_trait::async_trait;
 use dashmap::DashMap;
+use datafusion_physical_expr::PhysicalSortExpr;
 use futures::{future, stream, StreamExt, TryStreamExt};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -38,6 +40,7 @@ use crate::datasource::{
     TableProvider, TableType,
 };
 use crate::logical_expr::TableProviderFilterPushDown;
+use crate::physical_plan;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
@@ -54,6 +57,7 @@ use super::PartitionedFile;
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
 
 /// Configuration for creating a 'ListingTable'
+#[derive(Debug, Clone)]
 pub struct ListingTableConfig {
     /// Paths on the `ObjectStore` for creating `ListingTable`.
     /// They should share the same schema and object store.
@@ -162,6 +166,7 @@ impl ListingTableConfig {
             file_extension,
             target_partitions: ctx.config.target_partitions,
             table_partition_cols: vec![],
+            file_sort_order: None,
         };
 
         Ok(Self {
@@ -198,7 +203,7 @@ impl ListingTableConfig {
 }
 
 /// Options for creating a `ListingTable`
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct ListingOptions {
     /// A suffix on which files should be filtered (leave empty to
     /// keep all files on the path)
@@ -220,6 +225,16 @@ pub struct ListingOptions {
     /// Group files to avoid that the number of partitions exceeds
     /// this limit
     pub target_partitions: usize,
+    /// Optional pre-known sort order. Must be `SortExpr`s.
+    ///
+    /// DataFusion may take advantage of this ordering to omit sorts
+    /// or use more efficient algorithms. Currently sortedness must be
+    /// provided if it is known by some external mechanism, but may in
+    /// the future be automatically determined, for example using
+    /// parquet metadata.
+    ///
+    /// See <https://github.com/apache/arrow-datafusion/issues/4177>
+    pub file_sort_order: Option<Vec<Expr>>,
 }
 
 impl ListingOptions {
@@ -236,6 +251,7 @@ impl ListingOptions {
             table_partition_cols: vec![],
             collect_stat: true,
             target_partitions: 1,
+            file_sort_order: None,
         }
     }
 
@@ -361,6 +377,46 @@ impl ListingTable {
     pub fn options(&self) -> &ListingOptions {
         &self.options
     }
+
+    /// If file_sort_order is specified, creates the appropriate physical expressions
+    fn try_create_output_ordering(&self) -> Result<Option<Vec<PhysicalSortExpr>>> {
+        let file_sort_order =
+            if let Some(file_sort_order) = self.options.file_sort_order.as_ref() {
+                file_sort_order
+            } else {
+                return Ok(None);
+            };
+
+        // convert each expr to a physical sort expr
+        let sort_exprs = file_sort_order
+            .iter()
+            .map(|expr| {
+                if let Expr::Sort { expr, asc, nulls_first } = expr {
+                    if let Expr::Column(col) = expr.as_ref() {
+                        let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
+                        Ok(PhysicalSortExpr {
+                            expr,
+                            options: SortOptions {
+                                descending: !asc,
+                                nulls_first: *nulls_first,
+                            },
+                        })
+                    }
+                    else {
+                        Err(DataFusionError::Plan(
+                            format!("Only support single column references in output_ordering, got {:?}", expr)
+                        ))
+                    }
+                } else {
+                    Err(DataFusionError::Plan(
+                        format!("Expected Expr::Sort in output_ordering, but got {:?}", expr)
+                    ))
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Some(sort_exprs))
+    }
 }
 
 #[async_trait]
@@ -405,6 +461,7 @@ impl TableProvider for ListingTable {
                     statistics,
                     projection: projection.clone(),
                     limit,
+                    output_ordering: self.try_create_output_ordering()?,
                     table_partition_cols: self.options.table_partition_cols.clone(),
                     config_options: ctx.config.config_options(),
                 },
@@ -507,6 +564,7 @@ mod tests {
     };
     use arrow::datatypes::DataType;
     use chrono::DateTime;
+    use datafusion_common::assert_contains;
 
     use super::*;
 
@@ -554,6 +612,109 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_try_create_output_ordering() {
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+        let table_path = ListingTableUrl::parse(filename).unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
+        let schema = options.infer_schema(&state, &table_path).await.unwrap();
+
+        use physical_plan::expressions::col as physical_col;
+        use std::ops::Add;
+
+        // (file_sort_order, expected_result)
+        let cases = vec![
+            (None, Ok(None)),
+            // empty list
+            (Some(vec![]), Ok(Some(vec![]))),
+            // not a sort expr
+            (
+                Some(vec![col("string_col")]),
+                Err("Expected Expr::Sort in output_ordering, but got string_col"),
+            ),
+            // sort expr, but non column
+            (
+                Some(vec![
+                    col("int_col").add(lit(1)).sort(true, true),
+                ]),
+                Err("Only support single column references in output_ordering, got int_col + Int32(1)"),
+            ),
+            // ok with one column
+            (
+                Some(vec![col("string_col").sort(true, false)]),
+                Ok(Some(vec![PhysicalSortExpr {
+                    expr: physical_col("string_col", &schema).unwrap(),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: false,
+                    },
+                }]))
+
+            ),
+            // ok with two columns, different options
+            (
+                Some(vec![
+                    col("string_col").sort(true, false),
+                    col("int_col").sort(false, true),
+                ]),
+                Ok(Some(vec![
+                    PhysicalSortExpr {
+                        expr: physical_col("string_col", &schema).unwrap(),
+                        options: SortOptions {
+                            descending: false,
+                            nulls_first: false,
+                        },
+                    },
+                    PhysicalSortExpr {
+                        expr: physical_col("int_col", &schema).unwrap(),
+                        options: SortOptions {
+                            descending: true,
+                            nulls_first: true,
+                        },
+                    },
+                ]))
+
+            ),
+
+        ];
+
+        for (file_sort_order, expected_result) in cases {
+            let options = ListingOptions {
+                file_sort_order,
+                ..options.clone()
+            };
+            let config = ListingTableConfig::new(table_path.clone())
+                .with_listing_options(options)
+                .with_schema(schema.clone());
+
+            let table =
+                ListingTable::try_new(config.clone()).expect("Creating the table");
+            let ordering_result = table.try_create_output_ordering();
+
+            match (expected_result, ordering_result) {
+                (Ok(expected), Ok(result)) => {
+                    assert_eq!(expected, result);
+                }
+                (Err(expected), Err(result)) => {
+                    // can't compare the DataFusionError directly
+                    let result = result.to_string();
+                    let expected = expected.to_string();
+                    assert_contains!(result.to_string(), expected);
+                }
+                (expected_result, ordering_result) => {
+                    panic!(
+                        "expected: {:#?}\n\nactual:{:#?}",
+                        expected_result, ordering_result
+                    );
+                }
+            }
+        }
+    }
+
     #[tokio::test]
     async fn read_empty_table() -> Result<()> {
         let ctx = SessionContext::new();
@@ -566,6 +727,7 @@ mod tests {
             table_partition_cols: vec![String::from("p1")],
             target_partitions: 4,
             collect_stat: true,
+            file_sort_order: None,
         };
 
         let table_path = ListingTableUrl::parse("test:///table/").unwrap();
@@ -768,6 +930,7 @@ mod tests {
             table_partition_cols: vec![],
             target_partitions,
             collect_stat: true,
+            file_sort_order: None,
         };
 
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
@@ -805,6 +968,7 @@ mod tests {
             table_partition_cols: vec![],
             target_partitions,
             collect_stat: true,
+            file_sort_order: None,
         };
 
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 253172738..f57ffa9db 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -67,6 +67,7 @@ impl TableProviderFactory for ListingTableFactory {
             file_extension: file_extension.to_owned(),
             target_partitions: 1,
             table_partition_cols: vec![],
+            file_sort_order: None,
         };
 
         let table_path = ListingTableUrl::parse(&cmd.location)?;
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ce88b9f20..8b7bf1ffb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -564,6 +564,7 @@ impl SessionContext {
                     file_extension: file_extension.to_owned(),
                     target_partitions: self.copied_config().target_partitions,
                     table_partition_cols: cmd.table_partition_cols.clone(),
+                    file_sort_order: None,
                 };
                 self.register_listing_table(
                     cmd.name.as_str(),
@@ -807,7 +808,7 @@ impl SessionContext {
         table_path: impl AsRef<str>,
         options: ListingOptions,
         provided_schema: Option<SchemaRef>,
-        sql: Option<String>,
+        sql_definition: Option<String>,
     ) -> Result<()> {
         let table_path = ListingTableUrl::parse(table_path)?;
         let resolved_schema = match provided_schema {
@@ -817,7 +818,7 @@ impl SessionContext {
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
             .with_schema(resolved_schema);
-        let table = ListingTable::try_new(config)?.with_definition(sql);
+        let table = ListingTable::try_new(config)?.with_definition(sql_definition);
         self.register_table(name, Arc::new(table))?;
         Ok(())
     }
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index 150a20670..041639f0f 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -151,6 +151,7 @@ impl<'a> CsvReadOptions<'a> {
             file_extension: self.file_extension.to_owned(),
             target_partitions,
             table_partition_cols: self.table_partition_cols.clone(),
+            file_sort_order: None,
         }
     }
 }
@@ -225,6 +226,7 @@ impl<'a> ParquetReadOptions<'a> {
             file_extension: self.file_extension.to_owned(),
             target_partitions,
             table_partition_cols: self.table_partition_cols.clone(),
+            file_sort_order: None,
         }
     }
 }
@@ -274,6 +276,7 @@ impl<'a> AvroReadOptions<'a> {
             file_extension: self.file_extension.to_owned(),
             target_partitions,
             table_partition_cols: self.table_partition_cols.clone(),
+            file_sort_order: None,
         }
     }
 }
@@ -346,6 +349,7 @@ impl<'a> NdJsonReadOptions<'a> {
             file_extension: self.file_extension.to_owned(),
             target_partitions,
             table_partition_cols: self.table_partition_cols.clone(),
+            file_sort_order: None,
         }
     }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 21931397f..13f6d0bfa 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1069,6 +1069,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 839908d06..f88ef4ab0 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -271,6 +271,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 38178a976..59faac8fa 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
 use std::any::Any;
 use std::sync::Arc;
 
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
@@ -73,7 +73,7 @@ impl ExecutionPlan for AvroExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        get_output_ordering(&self.base_config)
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -235,6 +235,7 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -305,6 +306,7 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -374,6 +376,7 @@ mod tests {
             limit: None,
             table_partition_cols: vec!["date".to_owned()],
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index faa1401d2..8b50755f4 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -43,7 +43,7 @@ use std::path::Path;
 use std::sync::Arc;
 use tokio::task::{self, JoinHandle};
 
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
 
 /// Execution plan for scanning a CSV file
 #[derive(Debug, Clone)]
@@ -109,8 +109,9 @@ impl ExecutionPlan for CsvExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
+    /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        get_output_ordering(&self.base_config)
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index df12f3105..063238a6d 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -368,6 +368,7 @@ mod tests {
             limit,
             table_partition_cols: vec![],
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         };
 
         let file_stream = FileStream::new(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 409c53639..a2eb54920 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -43,7 +43,7 @@ use std::path::Path;
 use std::sync::Arc;
 use tokio::task::{self, JoinHandle};
 
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
 
 /// Execution plan for scanning NdJson data source
 #[derive(Debug, Clone)]
@@ -88,7 +88,7 @@ impl ExecutionPlan for NdJsonExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        get_output_ordering(&self.base_config)
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -328,6 +328,7 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             file_compression_type.to_owned(),
         );
@@ -404,6 +405,7 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             file_compression_type.to_owned(),
         );
@@ -450,6 +452,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             file_compression_type.to_owned(),
         );
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 11d0b9fad..fbeab4cbf 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,6 +38,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 pub use avro::AvroExec;
+use datafusion_physical_expr::PhysicalSortExpr;
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
 pub(crate) use json::plan_to_json;
 pub use json::NdJsonExec;
@@ -52,7 +53,7 @@ use crate::{
 use arrow::array::{new_null_array, UInt16BufferBuilder};
 use arrow::record_batch::RecordBatchOptions;
 use lazy_static::lazy_static;
-use log::info;
+use log::{debug, info};
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use std::{
@@ -100,6 +101,8 @@ pub struct FileScanConfig {
     pub limit: Option<usize>,
     /// The partitioning column names
     pub table_partition_cols: Vec<String>,
+    /// The order in which the data is sorted, if known.
+    pub output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Configuration options passed to the physical plans
     pub config_options: Arc<RwLock<ConfigOptions>>,
 }
@@ -445,6 +448,81 @@ impl From<ObjectMeta> for FileMeta {
     }
 }
 
+/// The various listing tables does not attempt to read all files
+/// concurrently, instead they will read files in sequence within a
+/// partition.  This is an important property as it allows plans to
+/// run against 1000s of files and not try to open them all
+/// concurrently.
+///
+/// However, it means if we assign more than one file to a partitition
+/// the output sort order will not be preserved as illustrated in the
+/// following diagrams:
+///
+/// When only 1 file is assigned to each partition, each partition is
+/// correctly sorted on `(A, B, C)`
+///
+/// ```text
+///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
+///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
+///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
+///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
+///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
+///┃                                          │                    │                     ┃
+///  │                   │ │                    │                    │                 │
+///┃                                          │                    │                     ┃
+///  │                   │ │                    │                    │                 │
+///┃                                          │                    │                     ┃
+///  │                   │ │                    │                    │                 │
+///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
+///     DataFusion           DataFusion           DataFusion           DataFusion
+///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
+/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
+///
+///                                      ParquetExec
+///```
+///
+/// However, when more than 1 file is assigned to each partition, each
+/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
+/// file is scanned, the same values for A, B and C can be repeated in
+/// the same sorted stream
+///
+///```text
+///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
+///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
+///┃   ┌───────────────┐     ┌──────────────┐ │
+///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
+///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
+///  │ └───────────────┘ │ │ └──────────────┘   ┃
+///┃   ┌───────────────┐     ┌──────────────┐ │
+///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
+///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
+///  │ └───────────────┘ │ │ └──────────────┘   ┃
+///┃                                          │
+///  │                   │ │                    ┃
+///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+///     DataFusion           DataFusion         ┃
+///┃    Partition 1          Partition 2
+/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
+///
+///              ParquetExec
+///```
+pub(crate) fn get_output_ordering(
+    base_config: &FileScanConfig,
+) -> Option<&[PhysicalSortExpr]> {
+    if let Some(output_ordering) = base_config.output_ordering.as_ref() {
+        if base_config.file_groups.iter().any(|group| group.len() > 1) {
+            debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}",
+                   output_ordering, base_config.file_groups);
+            None
+        } else {
+            Some(output_ordering)
+        }
+    } else {
+        None
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::{
@@ -710,6 +788,7 @@ mod tests {
             statistics,
             table_partition_cols,
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         }
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 7a44749c8..fa65d7656 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -65,6 +65,8 @@ mod row_groups;
 
 pub use metrics::ParquetFileMetrics;
 
+use super::get_output_ordering;
+
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -236,7 +238,7 @@ impl ExecutionPlan for ParquetExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        get_output_ordering(&self.base_config)
     }
 
     fn with_new_children(
@@ -301,24 +303,29 @@ impl ExecutionPlan for ParquetExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                if let Some(pre) = &self.pruning_predicate {
-                    write!(
-                        f,
-                        "ParquetExec: limit={:?}, partitions={}, predicate={}, projection={}",
-                        self.base_config.limit,
-                        super::FileGroupsDisplay(&self.base_config.file_groups),
-                        pre.predicate_expr(),
-                        super::ProjectSchemaDisplay(&self.projected_schema),
-                    )
-                } else {
-                    write!(
-                        f,
-                        "ParquetExec: limit={:?}, partitions={}, projection={}",
-                        self.base_config.limit,
-                        super::FileGroupsDisplay(&self.base_config.file_groups),
-                        super::ProjectSchemaDisplay(&self.projected_schema),
-                    )
-                }
+                let pruning_predicate_string = self
+                    .pruning_predicate
+                    .as_ref()
+                    // TODO change this to be pruning_predicate rather than 'predicate'
+                    // to avoid confusion
+                    // https://github.com/apache/arrow-datafusion/issues/4020
+                    .map(|pre| format!(", predicate={}", pre.predicate_expr()))
+                    .unwrap_or_default();
+
+                let output_ordering_string = self
+                    .output_ordering()
+                    .map(make_output_ordering_string)
+                    .unwrap_or_default();
+
+                write!(
+                    f,
+                    "ParquetExec: limit={:?}, partitions={}{}{}, projection={}",
+                    self.base_config.limit,
+                    super::FileGroupsDisplay(&self.base_config.file_groups),
+                    pruning_predicate_string,
+                    output_ordering_string,
+                    super::ProjectSchemaDisplay(&self.projected_schema),
+                )
             }
         }
     }
@@ -332,6 +339,20 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
+fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String {
+    use std::fmt::Write;
+    let mut w: String = ", output_ordering=[".into();
+
+    for (i, e) in ordering.iter().enumerate() {
+        if i > 0 {
+            write!(&mut w, ", ").unwrap()
+        }
+        write!(&mut w, "{}", e).unwrap()
+    }
+    write!(&mut w, "]").unwrap();
+    w
+}
+
 /// Implements [`FormatReader`] for a parquet file
 struct ParquetOpener {
     partition_index: usize,
@@ -731,6 +752,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             predicate,
             None,
@@ -1260,6 +1282,7 @@ mod tests {
                     limit: None,
                     table_partition_cols: vec![],
                     config_options: ConfigOptions::new().into_shareable(),
+                    output_ordering: None,
                 },
                 None,
                 None,
@@ -1362,6 +1385,7 @@ mod tests {
                     "day".to_owned(),
                 ],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             None,
             None,
@@ -1421,6 +1445,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             None,
             None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index be27b2596..e36a46b6d 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -167,6 +167,7 @@ pub fn partitioned_csv_config(
         limit: None,
         table_partition_cols: vec![],
         config_options: ConfigOptions::new().into_shareable(),
+        output_ordering: None,
     })
 }
 
diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
index ded5fad02..b48061331 100644
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -90,6 +90,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             None,
             None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 2a8791b69..8d8c3bcae 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -70,6 +70,7 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE
             limit: None,
             table_partition_cols: vec![],
             config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
         },
         Some(filter),
         None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 630c28a10..6567c8e75 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -107,6 +107,7 @@ async fn get_exec(
                 limit,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
+                output_ordering: None,
             },
             &[],
         )
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 7a0db41f1..770ebd973 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -18,6 +18,7 @@
 use std::{fs, path::Path};
 
 use ::parquet::arrow::ArrowWriter;
+use datafusion::datasource::listing::ListingOptions;
 use tempfile::TempDir;
 
 use super::*;
@@ -48,6 +49,104 @@ async fn parquet_query() {
     assert_batches_eq!(expected, &actual);
 }
 
+#[tokio::test]
+/// Test that if sort order is specified in ListingOptions, the sort
+/// expressions make it all the way down to the ParquetExec
+async fn parquet_with_sort_order_specified() {
+    let parquet_read_options = ParquetReadOptions::default();
+    let target_partitions = 2;
+
+    // The sort order is not specified
+    let options_no_sort = ListingOptions {
+        file_sort_order: None,
+        ..parquet_read_options.to_listing_options(target_partitions)
+    };
+
+    // The sort order is specified (not actually correct in this case)
+    let file_sort_order = [col("string_col"), col("int_col")]
+        .into_iter()
+        .map(|e| {
+            let ascending = true;
+            let nulls_first = false;
+            e.sort(ascending, nulls_first)
+        })
+        .collect::<Vec<_>>();
+
+    let options_sort = ListingOptions {
+        file_sort_order: Some(file_sort_order),
+        ..parquet_read_options.to_listing_options(target_partitions)
+    };
+
+    // This string appears in ParquetExec if the output ordering is
+    // specified
+    let expected_output_ordering =
+        "output_ordering=[string_col@9 ASC NULLS LAST, int_col@4 ASC NULLS LAST]";
+
+    // when sort not specified, should not appear in the explain plan
+    let num_files = 1;
+    assert_not_contains!(
+        run_query_with_options(options_no_sort, num_files).await,
+        expected_output_ordering
+    );
+
+    // when sort IS specified, SHOULD appear in the explain plan
+    let num_files = 1;
+    assert_contains!(
+        run_query_with_options(options_sort.clone(), num_files).await,
+        expected_output_ordering
+    );
+
+    // when sort IS specified, but there are too many files (greater
+    // than the number of partitions) sort should not appear
+    let num_files = 3;
+    assert_not_contains!(
+        run_query_with_options(options_sort, num_files).await,
+        expected_output_ordering
+    );
+}
+
+/// Runs a limit query against a parquet file that was registered from
+/// options on num_files copies of all_types_plain.parquet
+async fn run_query_with_options(options: ListingOptions, num_files: usize) -> String {
+    let ctx = SessionContext::new();
+
+    let testdata = datafusion::test_util::parquet_test_data();
+    let file_path = format!("{}/alltypes_plain.parquet", testdata);
+
+    // Create a directory of parquet files with names
+    // 0.parquet
+    // 1.parquet
+    let tmpdir = TempDir::new().unwrap();
+    for i in 0..num_files {
+        let target_file = tmpdir.path().join(format!("{i}.parquet"));
+        println!("Copying {file_path} to {target_file:?}");
+        std::fs::copy(&file_path, target_file).unwrap();
+    }
+
+    let provided_schema = None;
+    let sql_definition = None;
+    ctx.register_listing_table(
+        "t",
+        tmpdir.path().to_string_lossy(),
+        options.clone(),
+        provided_schema,
+        sql_definition,
+    )
+    .await
+    .unwrap();
+
+    let batches = ctx.sql("explain select int_col, string_col from t order by string_col, int_col limit 10")
+        .await
+        .expect("planing worked")
+        .collect()
+        .await
+        .expect("execution worked");
+
+    arrow::util::pretty::pretty_format_batches(&batches)
+        .unwrap()
+        .to_string()
+}
+
 #[tokio::test]
 async fn fixed_size_binary_columns() {
     let ctx = SessionContext::new();
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index de5f3749d..48716c558 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -109,6 +109,7 @@ message ListingTableScanNode {
     ParquetFormat parquet = 11;
     AvroFormat avro = 12;
   }
+  repeated datafusion.LogicalExprNode file_sort_order = 13;
 }
 
 message ViewTableScanNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index fe67b590a..6f9636f6b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6883,6 +6883,9 @@ impl serde::Serialize for ListingTableScanNode {
         if self.target_partitions != 0 {
             len += 1;
         }
+        if !self.file_sort_order.is_empty() {
+            len += 1;
+        }
         if self.file_format_type.is_some() {
             len += 1;
         }
@@ -6914,6 +6917,9 @@ impl serde::Serialize for ListingTableScanNode {
         if self.target_partitions != 0 {
             struct_ser.serialize_field("targetPartitions", &self.target_partitions)?;
         }
+        if !self.file_sort_order.is_empty() {
+            struct_ser.serialize_field("fileSortOrder", &self.file_sort_order)?;
+        }
         if let Some(v) = self.file_format_type.as_ref() {
             match v {
                 listing_table_scan_node::FileFormatType::Csv(v) => {
@@ -6951,6 +6957,8 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
             "collectStat",
             "target_partitions",
             "targetPartitions",
+            "file_sort_order",
+            "fileSortOrder",
             "csv",
             "parquet",
             "avro",
@@ -6967,6 +6975,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
             TablePartitionCols,
             CollectStat,
             TargetPartitions,
+            FileSortOrder,
             Csv,
             Parquet,
             Avro,
@@ -7000,6 +7009,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
                             "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols),
                             "collectStat" | "collect_stat" => Ok(GeneratedField::CollectStat),
                             "targetPartitions" | "target_partitions" => Ok(GeneratedField::TargetPartitions),
+                            "fileSortOrder" | "file_sort_order" => Ok(GeneratedField::FileSortOrder),
                             "csv" => Ok(GeneratedField::Csv),
                             "parquet" => Ok(GeneratedField::Parquet),
                             "avro" => Ok(GeneratedField::Avro),
@@ -7031,6 +7041,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
                 let mut table_partition_cols__ = None;
                 let mut collect_stat__ = None;
                 let mut target_partitions__ = None;
+                let mut file_sort_order__ = None;
                 let mut file_format_type__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
@@ -7090,6 +7101,12 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
                                 Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
                             ;
                         }
+                        GeneratedField::FileSortOrder => {
+                            if file_sort_order__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("fileSortOrder"));
+                            }
+                            file_sort_order__ = Some(map.next_value()?);
+                        }
                         GeneratedField::Csv => {
                             if file_format_type__.is_some() {
                                 return Err(serde::de::Error::duplicate_field("csv"));
@@ -7123,6 +7140,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
                     table_partition_cols: table_partition_cols__.unwrap_or_default(),
                     collect_stat: collect_stat__.unwrap_or_default(),
                     target_partitions: target_partitions__.unwrap_or_default(),
+                    file_sort_order: file_sort_order__.unwrap_or_default(),
                     file_format_type: file_format_type__,
                 })
             }
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 8962a7a8a..c84f9a179 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -132,6 +132,8 @@ pub struct ListingTableScanNode {
     pub collect_stat: bool,
     #[prost(uint32, tag="9")]
     pub target_partitions: u32,
+    #[prost(message, repeated, tag="13")]
+    pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNode>,
     #[prost(oneof="listing_table_scan_node::FileFormatType", tags="10, 11, 12")]
     pub file_format_type: ::core::option::Option<listing_table_scan_node::FileFormatType>,
 }
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index ba0ed5eb9..f3c4c678b 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -416,6 +416,20 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .map(|expr| parse_expr(expr, ctx))
                     .collect::<Result<Vec<_>, _>>()?;
 
+                let file_sort_order = scan
+                    .file_sort_order
+                    .iter()
+                    .map(|expr| parse_expr(expr, ctx))
+                    .collect::<Result<Vec<_>, _>>()?;
+
+                // Protobuf doesn't distinguish between "not present"
+                // and empty
+                let file_sort_order = if file_sort_order.is_empty() {
+                    None
+                } else {
+                    Some(file_sort_order)
+                };
+
                 let file_format: Arc<dyn FileFormat> =
                     match scan.file_format_type.as_ref().ok_or_else(|| {
                         proto_error(format!(
@@ -451,6 +465,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     table_partition_cols: scan.table_partition_cols.clone(),
                     collect_stat: scan.collect_stat,
                     target_partitions: scan.target_partitions as usize,
+                    file_sort_order,
                 };
 
                 let config =
@@ -871,18 +886,26 @@ impl AsLogicalPlan for LogicalPlanNode {
                             listing_table.options().format
                         )));
                     };
+
+                    let options = listing_table.options();
+                    let file_sort_order =
+                        if let Some(file_sort_order) = &options.file_sort_order {
+                            file_sort_order
+                                .iter()
+                                .map(|expr| expr.try_into())
+                                .collect::<Result<Vec<protobuf::LogicalExprNode>, _>>()?
+                        } else {
+                            vec![]
+                        };
+
                     Ok(protobuf::LogicalPlanNode {
                         logical_plan_type: Some(LogicalPlanType::ListingScan(
                             protobuf::ListingTableScanNode {
                                 file_format_type: Some(file_format_type),
                                 table_name: table_name.to_owned(),
-                                collect_stat: listing_table.options().collect_stat,
-                                file_extension: listing_table
-                                    .options()
-                                    .file_extension
-                                    .clone(),
-                                table_partition_cols: listing_table
-                                    .options()
+                                collect_stat: options.collect_stat,
+                                file_extension: options.file_extension.clone(),
+                                table_partition_cols: options
                                     .table_partition_cols
                                     .clone(),
                                 paths: listing_table
@@ -893,10 +916,8 @@ impl AsLogicalPlan for LogicalPlanNode {
                                 schema: Some(schema),
                                 projection,
                                 filters,
-                                target_partitions: listing_table
-                                    .options()
-                                    .target_partitions
-                                    as u32,
+                                target_partitions: options.target_partitions as u32,
+                                file_sort_order,
                             },
                         )),
                     })
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 41066584e..f1d06c46a 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -149,6 +149,7 @@ impl TestParquetFile {
             limit: None,
             table_partition_cols: vec![],
             config_options: config_options.into_shareable(),
+            output_ordering: None,
         };
 
         let df_schema = self.schema.clone().to_dfschema_ref()?;