You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/30 17:54:17 UTC
[arrow-datafusion] branch main updated: Support Defining Ordering Equivalence at the Source (#6469)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c7bfe15b49 Support Defining Ordering Equivalence at the Source (#6469)
c7bfe15b49 is described below
commit c7bfe15b4940ebff39f466212ccf32e891db7243
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Tue May 30 20:54:10 2023 +0300
Support Defining Ordering Equivalence at the Source (#6469)
* Convert ordering equivalence to vec (unit test)
* compiles
* Simplifications
* simplifications
* Remove unnecessary codes
* simplifications
* Add test cases
* fix bug
* simplifications
* Resolve linter errors
* remove unnecessary codes
* simplifications
* simplifications
* Remove unnecessary codes
* Add pruning to ordering_equivalence projection
* Remove unnecessary clones
* Convert get range to calculate compatible ranges
* Multiple ordering is supported, plan display will be fixed
* Simplifications
* Update comments
* Minor changes
* Reduce code duplication for ordering_eq_prop function
* Simplifications
* simplifications
* simplifications
* simplifications
* Simplifications
* Comments/API change/Proto naming
* Update comments
* Use builder style for ordering equivalence creation
* Adapt order_by_to_sort_expr() change
* Move LexOrdering definition to common place
* All Vec<OrderByExpr>'s are replaced with new type LexOrdering
---------
Co-authored-by: Mustafa Akur <mu...@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion-cli/Cargo.lock | 30 ++--
datafusion-examples/examples/csv_opener.rs | 2 +-
datafusion-examples/examples/json_opener.rs | 2 +-
datafusion/core/src/datasource/file_format/mod.rs | 2 +-
.../core/src/datasource/file_format/options.rs | 2 +-
datafusion/core/src/datasource/listing/table.rs | 72 ++++----
.../core/src/datasource/listing_table_factory.rs | 8 +-
.../combine_partial_final_agg.rs | 2 +-
.../src/physical_optimizer/dist_enforcement.rs | 8 +-
.../core/src/physical_optimizer/repartition.rs | 8 +-
.../src/physical_optimizer/sort_enforcement.rs | 4 +-
.../src/physical_plan/file_format/arrow_file.rs | 20 ++-
.../core/src/physical_plan/file_format/avro.rs | 28 +++-
.../core/src/physical_plan/file_format/csv.rs | 23 ++-
.../src/physical_plan/file_format/file_stream.rs | 2 +-
.../core/src/physical_plan/file_format/json.rs | 31 ++--
.../core/src/physical_plan/file_format/mod.rs | 33 ++--
.../core/src/physical_plan/file_format/parquet.rs | 54 +++---
.../src/physical_plan/joins/symmetric_hash_join.rs | 18 +-
datafusion/core/src/physical_plan/mod.rs | 39 ++++-
datafusion/core/src/test/mod.rs | 4 +-
datafusion/core/src/test_util/mod.rs | 2 +-
datafusion/core/src/test_util/parquet.rs | 2 +-
datafusion/core/tests/fifo.rs | 18 +-
datafusion/core/tests/parquet/custom_reader.rs | 2 +-
datafusion/core/tests/parquet/page_pruning.rs | 2 +-
datafusion/core/tests/row.rs | 2 +-
datafusion/core/tests/sql/order.rs | 5 +-
datafusion/core/tests/sql/parquet.rs | 6 +-
.../test_files/create_external_table.slt | 4 -
.../tests/sqllogictests/test_files/groupby.slt | 55 ++++++
datafusion/expr/src/logical_plan/ddl.rs | 2 +-
datafusion/physical-expr/src/equivalence.rs | 19 +++
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/sort_expr.rs | 3 +
datafusion/proto/proto/datafusion.proto | 14 +-
datafusion/proto/src/generated/pbjson.rs | 184 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 18 +-
datafusion/proto/src/logical_plan/mod.rs | 107 ++++++------
datafusion/proto/src/physical_plan/from_proto.rs | 43 +++--
datafusion/proto/src/physical_plan/mod.rs | 2 +-
datafusion/proto/src/physical_plan/to_proto.rs | 32 ++--
datafusion/sql/src/parser.rs | 28 ++--
datafusion/sql/src/statement.rs | 42 ++---
datafusion/substrait/src/physical_plan/consumer.rs | 2 +-
.../substrait/tests/roundtrip_physical_plan.rs | 2 +-
46 files changed, 686 insertions(+), 304 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 57513528be..6cc9fa2011 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -302,7 +302,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -1370,7 +1370,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -2233,7 +2233,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -2292,9 +2292,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
-version = "1.0.58"
+version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8"
+checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b"
dependencies = [
"unicode-ident",
]
@@ -2311,9 +2311,9 @@ dependencies = [
[[package]]
name = "quote"
-version = "1.0.27"
+version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500"
+checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488"
dependencies = [
"proc-macro2",
]
@@ -2659,7 +2659,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -2838,9 +2838,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.16"
+version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01"
+checksum = "45b6ddbb36c5b969c182aec3c4a0bce7df3fbad4b77114706a49aacc80567388"
dependencies = [
"proc-macro2",
"quote",
@@ -2892,7 +2892,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -2982,7 +2982,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -3080,7 +3080,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
]
[[package]]
@@ -3244,7 +3244,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
"wasm-bindgen-shared",
]
@@ -3278,7 +3278,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.16",
+ "syn 2.0.17",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
index 0143b0297e..351f95cd2c 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -64,7 +64,7 @@ async fn main() -> Result<()> {
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
index 843bed4f61..6c7d5ae3d7 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -68,7 +68,7 @@ async fn main() -> Result<()> {
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 28f798ade4..cffd6b9caf 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -141,7 +141,7 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs
index 4024da3313..3e802362d3 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -443,7 +443,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
// TODO: Add file sort order into CsvReadOptions and introduce here.
- .with_file_sort_order(None)
+ .with_file_sort_order(vec![])
.with_infinite_source(self.infinite)
}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 8bb3818c91..bea3f837cb 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -27,7 +27,7 @@ use dashmap::DashMap;
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr};
+use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -44,14 +44,12 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
+use crate::physical_plan::file_format::FileScanConfig;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
- physical_plan::{
- empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
- Statistics,
- },
+ physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan, Statistics},
};
use super::PartitionedFile;
@@ -222,7 +220,7 @@ pub struct ListingOptions {
/// Group files to avoid that the number of partitions exceeds
/// this limit
pub target_partitions: usize,
- /// Optional pre-known sort order. Must be `SortExpr`s.
+ /// Optional pre-known sort order(s). Must be `SortExpr`s.
///
/// DataFusion may take advantage of this ordering to omit sorts
/// or use more efficient algorithms. Currently sortedness must be
@@ -231,7 +229,12 @@ pub struct ListingOptions {
/// parquet metadata.
///
/// See <https://github.com/apache/arrow-datafusion/issues/4177>
- pub file_sort_order: Option<Vec<Expr>>,
+ /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
+ /// where each ordering consists of an individual lexicographic
+ /// ordering (encapsulated by a `Vec<Expr>`). If there aren't
+ /// multiple equivalent orderings, the outer `Vec` will have a
+ /// single element.
+ pub file_sort_order: Vec<Vec<Expr>>,
/// Infinite source means that the input is not guaranteed to end.
/// Currently, CSV, JSON, and AVRO formats are supported.
/// In order to support infinite inputs, DataFusion may adjust query
@@ -253,7 +256,7 @@ impl ListingOptions {
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
- file_sort_order: None,
+ file_sort_order: vec![],
infinite_source: false,
}
}
@@ -407,9 +410,9 @@ impl ListingOptions {
/// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
///
/// // Tell datafusion that the files are sorted by column "a"
- /// let file_sort_order = Some(vec![
+ /// let file_sort_order = vec![vec![
/// col("a").sort(true, true)
- /// ]);
+ /// ]];
///
/// let listing_options = ListingOptions::new(Arc::new(
/// ParquetFormat::default()
@@ -418,7 +421,7 @@ impl ListingOptions {
///
/// assert_eq!(listing_options.file_sort_order, file_sort_order);
/// ```
- pub fn with_file_sort_order(mut self, file_sort_order: Option<Vec<Expr>>) -> Self {
+ pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
@@ -612,16 +615,12 @@ impl ListingTable {
}
/// If file_sort_order is specified, creates the appropriate physical expressions
- fn try_create_output_ordering(&self) -> Result<Option<Vec<PhysicalSortExpr>>> {
- let file_sort_order =
- if let Some(file_sort_order) = self.options.file_sort_order.as_ref() {
- file_sort_order
- } else {
- return Ok(None);
- };
+ fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
+ let mut all_sort_orders = vec![];
- // convert each expr to a physical sort expr
- let sort_exprs = file_sort_order
+ for exprs in &self.options.file_sort_order {
+ // Construct PhsyicalSortExpr objects from Expr objects:
+ let sort_exprs = exprs
.iter()
.map(|expr| {
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
@@ -637,7 +636,7 @@ impl ListingTable {
}
else {
Err(DataFusionError::Plan(
- format!("Only support single column references in output_ordering, got {expr:?}")
+ format!("Expected single column references in output_ordering, got {expr:?}")
))
}
} else {
@@ -647,8 +646,9 @@ impl ListingTable {
}
})
.collect::<Result<Vec<_>>>()?;
-
- Ok(Some(sort_exprs))
+ all_sort_orders.push(sort_exprs);
+ }
+ Ok(all_sort_orders)
}
}
@@ -956,40 +956,38 @@ mod tests {
// (file_sort_order, expected_result)
let cases = vec![
- (None, Ok(None)),
- // empty list
- (Some(vec![]), Ok(Some(vec![]))),
+ (vec![], Ok(vec![])),
// not a sort expr
(
- Some(vec![col("string_col")]),
+ vec![vec![col("string_col")]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
// sort expr, but non column
(
- Some(vec![
+ vec![vec![
col("int_col").add(lit(1)).sort(true, true),
- ]),
- Err("Only support single column references in output_ordering, got int_col + Int32(1)"),
+ ]],
+ Err("Expected single column references in output_ordering, got int_col + Int32(1)"),
),
// ok with one column
(
- Some(vec![col("string_col").sort(true, false)]),
- Ok(Some(vec![PhysicalSortExpr {
+ vec![vec![col("string_col").sort(true, false)]],
+ Ok(vec![vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
- }]))
+ }]])
),
// ok with two columns, different options
(
- Some(vec![
+ vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
- ]),
- Ok(Some(vec![
+ ]],
+ Ok(vec![vec![
PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
@@ -1004,7 +1002,7 @@ mod tests {
nulls_first: true,
},
},
- ]))
+ ]])
),
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index dfc3d11732..7d10fc8e0e 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -128,12 +128,6 @@ impl TableProviderFactory for ListingTableFactory {
(Some(schema), table_partition_cols)
};
- let file_sort_order = if cmd.order_exprs.is_empty() {
- None
- } else {
- Some(cmd.order_exprs.clone())
- };
-
// look for 'infinite' as an option
let infinite_source = cmd.unbounded;
@@ -143,7 +137,7 @@ impl TableProviderFactory for ListingTableFactory {
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_infinite_source(infinite_source)
- .with_file_sort_order(file_sort_order);
+ .with_file_sort_order(cmd.order_exprs.clone());
let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 5657c62921..fdef7d54b8 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -261,7 +261,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 5c2a4d27f6..6c425b1740 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1000,11 +1000,11 @@ mod tests {
}
fn parquet_exec() -> Arc<ParquetExec> {
- parquet_exec_with_sort(None)
+ parquet_exec_with_sort(vec![])
}
fn parquet_exec_with_sort(
- output_ordering: Option<Vec<PhysicalSortExpr>>,
+ output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
@@ -1025,7 +1025,7 @@ mod tests {
// Created a sorted parquet exec with multiple files
fn parquet_exec_multiple_sorted(
- output_ordering: Option<Vec<PhysicalSortExpr>>,
+ output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
@@ -2133,7 +2133,7 @@ mod tests {
}];
// Scan some sorted parquet files
- let exec = parquet_exec_multiple_sorted(Some(sort_key.clone()));
+ let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
// CoalesceBatchesExec to mimic behavior after a filter
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 8b407ed289..7db54eee51 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -356,7 +356,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -378,7 +378,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -402,7 +402,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: Some(sort_exprs),
+ output_ordering: vec![sort_exprs],
infinite_source: false,
},
None,
@@ -429,7 +429,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: Some(sort_exprs),
+ output_ordering: vec![sort_exprs],
infinite_source: false,
},
None,
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index f71c79e9fc..22f7d509c8 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -2805,7 +2805,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -2829,7 +2829,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: Some(sort_exprs),
+ output_ordering: vec![sort_exprs],
infinite_source: false,
},
None,
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
index d229031d37..72a6d0a0b4 100644
--- a/datafusion/core/src/physical_plan/file_format/arrow_file.rs
+++ b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
@@ -22,12 +22,15 @@ use crate::physical_plan::file_format::{
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ Partitioning, SendableRecordBatchStream,
};
use arrow_schema::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{
+ LexOrdering, OrderingEquivalenceProperties, PhysicalSortExpr,
+};
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
use std::any::Any;
@@ -40,7 +43,7 @@ pub struct ArrowExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -83,7 +86,16 @@ impl ExecutionPlan for ArrowExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+ ordering_equivalence_properties_helper(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 16dc146750..5b407d5498 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -17,14 +17,17 @@
//! Execution plan for reading line-delimited Avro files
use crate::error::Result;
+use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
+
use arrow::datatypes::SchemaRef;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
-use crate::execution::context::TaskContext;
-use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use std::any::Any;
use std::sync::Arc;
@@ -37,7 +40,7 @@ pub struct AvroExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -80,7 +83,16 @@ impl ExecutionPlan for AvroExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+ ordering_equivalence_properties_helper(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -265,7 +277,7 @@ mod tests {
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -337,7 +349,7 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -409,7 +421,7 @@ mod tests {
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index f19cf0743a..1dde376315 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -20,6 +20,7 @@
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
@@ -27,15 +28,14 @@ use crate::physical_plan::file_format::file_stream::{
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
-use bytes::Buf;
-
-use crate::physical_plan::common::AbortOnDropSingle;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use futures::ready;
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectStore};
@@ -54,7 +54,7 @@ pub struct CsvExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ projected_output_ordering: Vec<LexOrdering>,
has_header: bool,
delimiter: u8,
/// Execution metrics
@@ -121,7 +121,16 @@ impl ExecutionPlan for CsvExec {
/// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+ ordering_equivalence_properties_helper(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index d8d1b40726..2ca9a076cf 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -472,7 +472,7 @@ mod tests {
projection: None,
limit,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
let metrics_set = ExecutionPlanMetricsSet::new();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 071c51b8c0..dcf23fdb25 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -19,6 +19,7 @@
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
@@ -26,14 +27,15 @@ use crate::physical_plan::file_format::file_stream::{
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
+
+use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use bytes::{Buf, Bytes};
-
-use crate::physical_plan::common::AbortOnDropSingle;
-use arrow::json::ReaderBuilder;
use futures::{ready, stream, StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
@@ -52,7 +54,7 @@ pub struct NdJsonExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
file_compression_type: FileCompressionType,
@@ -101,7 +103,16 @@ impl ExecutionPlan for NdJsonExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+ ordering_equivalence_properties_helper(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -427,7 +438,7 @@ mod tests {
projection: None,
limit: Some(3),
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
file_compression_type.to_owned(),
@@ -503,7 +514,7 @@ mod tests {
projection: None,
limit: Some(3),
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
file_compression_type.to_owned(),
@@ -549,7 +560,7 @@ mod tests {
projection: Some(vec![0, 2]),
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
file_compression_type.to_owned(),
@@ -600,7 +611,7 @@ mod tests {
projection: Some(vec![3, 0, 2]),
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
file_compression_type.to_owned(),
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index a05c0cf8c8..72dd262b05 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,7 +38,7 @@ use arrow::{
};
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
pub(crate) use json::plan_to_json;
pub use json::{JsonOpener, NdJsonExec};
@@ -155,15 +155,15 @@ pub struct FileScanConfig {
pub limit: Option<usize>,
/// The partitioning columns
pub table_partition_cols: Vec<(String, DataType)>,
- /// The order in which the data is sorted, if known.
- pub output_ordering: Option<Vec<PhysicalSortExpr>>,
+ /// All equivalent lexicographical orderings that describe the schema.
+ pub output_ordering: Vec<LexOrdering>,
/// Indicates whether this plan may produce an infinite stream of records.
pub infinite_source: bool,
}
impl FileScanConfig {
/// Project the schema and the statistics on the given column indices
- fn project(&self) -> (SchemaRef, Statistics, Option<Vec<PhysicalSortExpr>>) {
+ fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
return (
Arc::clone(&self.file_schema),
@@ -250,7 +250,7 @@ impl Debug for FileScanConfig {
impl Display for FileScanConfig {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
- let (schema, _, ordering) = self.project();
+ let (schema, _, orderings) = self.project();
write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?;
@@ -266,11 +266,12 @@ impl Display for FileScanConfig {
write!(f, ", infinite_source=true")?;
}
- if let Some(orders) = ordering {
- if !orders.is_empty() {
- write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?;
+ if let Some(ordering) = orderings.first() {
+ if !ordering.is_empty() {
+ write!(f, ", output_ordering={}", OutputOrderingDisplay(ordering))?;
}
}
+
Ok(())
}
}
@@ -847,14 +848,15 @@ impl From<ObjectMeta> for FileMeta {
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
-) -> Option<Vec<PhysicalSortExpr>> {
- let mut new_ordering = vec![];
- if let Some(output_ordering) = &base_config.output_ordering {
+) -> Vec<Vec<PhysicalSortExpr>> {
+ let mut all_orderings = vec![];
+ for output_ordering in &base_config.output_ordering {
if base_config.file_groups.iter().any(|group| group.len() > 1) {
debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}",
- output_ordering, base_config.file_groups);
- return None;
+ base_config.output_ordering[0], base_config.file_groups);
+ return vec![];
}
+ let mut new_ordering = vec![];
for PhysicalSortExpr { expr, options } in output_ordering {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
@@ -871,8 +873,9 @@ fn get_projected_output_ordering(
// since rest of the orderings are violated
break;
}
+ all_orderings.push(new_ordering);
}
- (!new_ordering.is_empty()).then_some(new_ordering)
+ all_orderings
}
#[cfg(test)]
@@ -1330,7 +1333,7 @@ mod tests {
projection,
statistics,
table_partition_cols,
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 9843ecc22e..3b622423f2 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,8 +17,6 @@
//! Execution plan for reading Parquet files
-use arrow::datatypes::{DataType, SchemaRef};
-use datafusion_physical_expr::PhysicalExpr;
use fmt::Debug;
use std::any::Any;
use std::cmp::min;
@@ -27,25 +25,32 @@ use std::fs;
use std::ops::Range;
use std::sync::Arc;
-use crate::config::ConfigOptions;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
-use crate::physical_plan::file_format::FileMeta;
+use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
use crate::{
+ config::ConfigOptions,
datasource::listing::FileRange,
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
+ common::AbortOnDropSingle,
expressions::PhysicalSortExpr,
- file_format::{FileScanConfig, SchemaAdapter},
+ file_format::{FileMeta, FileScanConfig, SchemaAdapter},
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
+ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
},
};
+
+use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
+use datafusion_physical_expr::{
+ LexOrdering, OrderingEquivalenceProperties, PhysicalExpr,
+};
+
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
@@ -64,8 +69,6 @@ mod page_filter;
mod row_filter;
mod row_groups;
-use crate::physical_plan::common::AbortOnDropSingle;
-use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
pub use metrics::ParquetFileMetrics;
#[derive(Default)]
@@ -90,7 +93,7 @@ pub struct ParquetExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
@@ -342,7 +345,16 @@ impl ExecutionPlan for ParquetExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+ ordering_equivalence_properties_helper(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn with_new_children(
@@ -900,7 +912,7 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
predicate,
@@ -1549,7 +1561,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -1663,7 +1675,7 @@ mod tests {
),
),
],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -1725,7 +1737,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -1917,7 +1929,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -1954,7 +1966,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -1996,7 +2008,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -2034,7 +2046,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -2073,7 +2085,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
@@ -2102,7 +2114,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 5602ed4cf3..3bd177e03b 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -2415,16 +2415,14 @@ mod tests {
Field::new("a2", DataType::UInt32, false),
]));
// Specify the ordering:
- let file_sort_order = Some(
- [datafusion_expr::col("a1")]
- .into_iter()
- .map(|e| {
- let ascending = true;
- let nulls_first = false;
- e.sort(ascending, nulls_first)
- })
- .collect::<Vec<_>>(),
- );
+ let file_sort_order = vec![[datafusion_expr::col("a1")]
+ .into_iter()
+ .map(|e| {
+ let ascending = true;
+ let nulls_first = false;
+ e.sort(ascending, nulls_first)
+ })
+ .collect::<Vec<_>>()];
register_unbounded_file_with_ordering(
&ctx,
schema.clone(),
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index a9881ad435..2c3a3e9b49 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -576,6 +576,43 @@ impl PartialEq for Partitioning {
}
}
+/// Retrieves the ordering equivalence properties for a given schema and output ordering.
+pub fn ordering_equivalence_properties_helper(
+ schema: SchemaRef,
+ eq_orderings: &[LexOrdering],
+) -> OrderingEquivalenceProperties {
+ let mut oep = OrderingEquivalenceProperties::new(schema);
+ let first_ordering = if let Some(first) = eq_orderings.first() {
+ first
+ } else {
+ // Return an empty OrderingEquivalenceProperties:
+ return oep;
+ };
+ let first_column = first_ordering
+ .iter()
+ .map(|e| TryFrom::try_from(e.clone()))
+ .collect::<Result<Vec<_>>>();
+ let checked_column_first = if let Ok(first) = first_column {
+ first
+ } else {
+ // Return an empty OrderingEquivalenceProperties:
+ return oep;
+ };
+ // First entry among eq_orderings is the head, skip it:
+ for ordering in eq_orderings.iter().skip(1) {
+ let column = ordering
+ .iter()
+ .map(|e| TryFrom::try_from(e.clone()))
+ .collect::<Result<Vec<_>>>();
+ if let Ok(column) = column {
+ if !column.is_empty() {
+ oep.add_equal_conditions((&checked_column_first, &column))
+ }
+ }
+ }
+ oep
+}
+
/// Distribution schemes
#[derive(Debug, Clone)]
pub enum Distribution {
@@ -606,7 +643,7 @@ impl Distribution {
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{
- expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
+ expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, LexOrdering,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index f99071a6f1..6e4d038b4a 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -195,7 +195,7 @@ pub fn partitioned_csv_config(
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
})
}
@@ -344,7 +344,7 @@ pub fn csv_exec_sorted(
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: Some(sort_exprs),
+ output_ordering: vec![sort_exprs],
infinite_source,
},
false,
diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index d42379b82a..993ca9c186 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -518,7 +518,7 @@ pub async fn register_unbounded_file_with_ordering(
schema: SchemaRef,
file_path: &Path,
table_name: &str,
- file_sort_order: Option<Vec<Expr>>,
+ file_sort_order: Vec<Vec<Expr>>,
with_unbounded_execution: bool,
) -> Result<()> {
// Mark infinite and provide schema:
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index 14215cc9a0..17766142bd 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -150,7 +150,7 @@ impl TestParquetFile {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index f5a62ddee4..e972d39f4a 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -254,16 +254,14 @@ mod unix_test {
Field::new("a2", DataType::UInt32, false),
]));
// Specify the ordering:
- let file_sort_order = Some(
- [datafusion_expr::col("a1")]
- .into_iter()
- .map(|e| {
- let ascending = true;
- let nulls_first = false;
- e.sort(ascending, nulls_first)
- })
- .collect::<Vec<_>>(),
- );
+ let file_sort_order = vec![[datafusion_expr::col("a1")]
+ .into_iter()
+ .map(|e| {
+ let ascending = true;
+ let nulls_first = false;
+ e.sort(ascending, nulls_first)
+ })
+ .collect::<Vec<_>>()];
// Set unbounded sorted files read configuration
register_unbounded_file_with_ordering(
&ctx,
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index 62ca6ae859..501623dbd9 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -82,7 +82,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 1d444326bb..8be7aeb0ca 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -76,7 +76,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
Some(predicate),
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 55310bb611..f5cb0e4804 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -87,7 +87,7 @@ async fn get_exec(
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
},
None,
diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs
index fa6ac61244..07254c8044 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -56,10 +56,7 @@ async fn create_external_table_with_order() -> Result<()> {
let table_dyn = listing_table_factory.create(&ctx.state(), &cmd).await?;
let table = table_dyn.as_any().downcast_ref::<ListingTable>().unwrap();
assert_eq!(cmd.order_exprs.len(), 1);
- assert_eq!(
- &cmd.order_exprs,
- table.options().file_sort_order.as_ref().unwrap()
- );
+ assert_eq!(cmd.order_exprs, table.options().file_sort_order);
Ok(())
}
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 532a0414f9..0d780c6694 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -58,9 +58,7 @@ async fn parquet_with_sort_order_specified() {
let session_config = SessionConfig::new().with_target_partitions(2);
// The sort order is not specified
- let options_no_sort = parquet_read_options
- .to_listing_options(&session_config)
- .with_file_sort_order(None);
+ let options_no_sort = parquet_read_options.to_listing_options(&session_config);
// The sort order is specified (not actually correct in this case)
let file_sort_order = [col("string_col"), col("int_col")]
@@ -74,7 +72,7 @@ async fn parquet_with_sort_order_specified() {
let options_sort = parquet_read_options
.to_listing_options(&session_config)
- .with_file_sort_order(Some(file_sort_order));
+ .with_file_sort_order(vec![file_sort_order]);
// This string appears in ParquetExec if the output ordering is
// specified
diff --git a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
index ce6fdfeee0..c08d5a55c3 100644
--- a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
@@ -72,10 +72,6 @@ CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv' LOCATION 'bar.csv'
statement error DataFusion error: SQL error: ParserError\("WITH HEADER ROW specified more than once"\)
CREATE EXTERNAL TABLE t STORED AS CSV WITH HEADER ROW WITH HEADER ROW LOCATION 'foo.csv'
-# Duplicate `WITH ORDER` clause
-statement error DataFusion error: SQL error: ParserError\("WITH ORDER specified more than once"\)
-CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1) WITH ORDER (c1) LOCATION 'foo.csv'
-
# Duplicate `DELIMITER` clause
statement error DataFusion error: SQL error: ParserError\("DELIMITER specified more than once"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV DELIMITER '-' DELIMITER '+' LOCATION 'foo.csv'
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 8bac60bbba..08523648dc 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1940,6 +1940,61 @@ WITH HEADER ROW
WITH ORDER (a ASC, b ASC, c ASC)
LOCATION 'tests/data/window_2.csv';
+# Create a table with 2 ordered columns.
+# In the next step, we will expect to observe the removed sort execs.
+statement ok
+CREATE EXTERNAL TABLE multiple_ordered_table (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# Expected a sort exec for b DESC
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY b DESC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.b DESC NULLS FIRST
+----TableScan: multiple_ordered_table projection=[a, b]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--SortExec: expr=[b@1 DESC]
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
+
+# Final plan shouldn't have SortExec c ASC,
+# because table already satisfies this ordering.
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY c ASC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.c ASC NULLS LAST
+----TableScan: multiple_ordered_table projection=[a, c]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+# Final plan shouldn't have SortExec a ASC, b ASC,
+# because table already satisfies this ordering.
+query TT
+EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY a ASC, b ASC;
+----
+logical_plan
+Projection: multiple_ordered_table.a
+--Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
+----TableScan: multiple_ordered_table projection=[a, b]
+physical_plan
+ProjectionExec: expr=[a@0 as a]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
+
# test_window_agg_sort
statement ok
set datafusion.execution.target_partitions = 1;
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index 2b02295b79..e005f11471 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -189,7 +189,7 @@ pub struct CreateExternalTable {
/// SQL used to create the table, if available
pub definition: Option<String>,
/// Order expressions supplied by user
- pub order_exprs: Vec<Expr>,
+ pub order_exprs: Vec<Vec<Expr>>,
/// File compression type (GZIP, BZIP2, XZ, ZSTD)
pub file_compression_type: CompressionTypeVariant,
/// Whether the table is an infinite streams
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 6af72efc5a..83f26a1d06 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -22,6 +22,7 @@ use crate::{
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
+use datafusion_common::DataFusionError;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
@@ -221,6 +222,24 @@ impl From<OrderedColumn> for PhysicalSortExpr {
}
}
+impl TryFrom<PhysicalSortExpr> for OrderedColumn {
+ type Error = DataFusionError;
+
+ fn try_from(value: PhysicalSortExpr) -> Result<Self, Self::Error> {
+ if let Some(col) = value.expr.as_any().downcast_ref::<Column>() {
+ Ok(OrderedColumn {
+ col: col.clone(),
+ options: value.options,
+ })
+ } else {
+ Err(DataFusionError::NotImplemented(
+ "Only Column PhysicalSortExpr's can be downcasted to OrderedColumn yet"
+ .to_string(),
+ ))
+ }
+ }
+}
+
impl From<OrderedColumn> for PhysicalSortRequirement {
fn from(value: OrderedColumn) -> Self {
PhysicalSortRequirement {
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 5dd8192def..b54bcda601 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,7 +56,7 @@ pub use equivalence::{
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
+pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index 78cdd87db2..665a47e586 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -213,3 +213,6 @@ fn to_str(options: &SortOptions) -> &str {
(false, false) => "ASC NULLS LAST",
}
}
+
+/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
+pub type LexOrdering = Vec<PhysicalSortExpr>;
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 7c35452085..2f6367bc18 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -96,6 +96,10 @@ message ParquetFormat {
message AvroFormat {}
+message LogicalExprNodeCollection {
+ repeated LogicalExprNode logical_expr_nodes = 1;
+}
+
message ListingTableScanNode {
reserved 1; // was string table_name
OwnedTableReference table_name = 14;
@@ -112,7 +116,7 @@ message ListingTableScanNode {
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
- repeated LogicalExprNode file_sort_order = 13;
+ repeated LogicalExprNodeCollection file_sort_order = 13;
}
message ViewTableScanNode {
@@ -183,7 +187,7 @@ message CreateExternalTableNode {
string delimiter = 8;
string definition = 9;
string file_compression_type = 10;
- repeated LogicalExprNode order_exprs = 13;
+ repeated LogicalExprNodeCollection order_exprs = 13;
bool unbounded = 14;
map<string, string> options = 11;
}
@@ -1188,6 +1192,10 @@ message ScanLimit {
uint32 limit = 1;
}
+message PhysicalSortExprNodeCollection {
+ repeated PhysicalSortExprNode physical_sort_expr_nodes = 1;
+}
+
message FileScanExecConf {
// Was repeated ConfigOption options = 10;
reserved 10;
@@ -1199,7 +1207,7 @@ message FileScanExecConf {
Statistics statistics = 6;
repeated string table_partition_cols = 7;
string object_store_url = 8;
- repeated PhysicalSortExprNode output_ordering = 9;
+ repeated PhysicalSortExprNodeCollection output_ordering = 9;
}
message ParquetScanExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 6dbe25c2d8..0134c71e9a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -10847,6 +10847,98 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
deserializer.deserialize_struct("datafusion.LogicalExprNode", FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for LogicalExprNodeCollection {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.logical_expr_nodes.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser = serializer.serialize_struct("datafusion.LogicalExprNodeCollection", len)?;
+ if !self.logical_expr_nodes.is_empty() {
+ struct_ser.serialize_field("logicalExprNodes", &self.logical_expr_nodes)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for LogicalExprNodeCollection {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "logical_expr_nodes",
+ "logicalExprNodes",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ LogicalExprNodes,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "logicalExprNodes" | "logical_expr_nodes" => Ok(GeneratedField::LogicalExprNodes),
+ _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = LogicalExprNodeCollection;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ formatter.write_str("struct datafusion.LogicalExprNodeCollection")
+ }
+
+ fn visit_map<V>(self, mut map: V) -> std::result::Result<LogicalExprNodeCollection, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut logical_expr_nodes__ = None;
+ while let Some(k) = map.next_key()? {
+ match k {
+ GeneratedField::LogicalExprNodes => {
+ if logical_expr_nodes__.is_some() {
+ return Err(serde::de::Error::duplicate_field("logicalExprNodes"));
+ }
+ logical_expr_nodes__ = Some(map.next_value()?);
+ }
+ }
+ }
+ Ok(LogicalExprNodeCollection {
+ logical_expr_nodes: logical_expr_nodes__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.LogicalExprNodeCollection", FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for LogicalExtensionNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
@@ -15741,6 +15833,98 @@ impl<'de> serde::Deserialize<'de> for PhysicalSortExprNode {
deserializer.deserialize_struct("datafusion.PhysicalSortExprNode", FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for PhysicalSortExprNodeCollection {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.physical_sort_expr_nodes.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalSortExprNodeCollection", len)?;
+ if !self.physical_sort_expr_nodes.is_empty() {
+ struct_ser.serialize_field("physicalSortExprNodes", &self.physical_sort_expr_nodes)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalSortExprNodeCollection {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "physical_sort_expr_nodes",
+ "physicalSortExprNodes",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ PhysicalSortExprNodes,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "physicalSortExprNodes" | "physical_sort_expr_nodes" => Ok(GeneratedField::PhysicalSortExprNodes),
+ _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = PhysicalSortExprNodeCollection;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ formatter.write_str("struct datafusion.PhysicalSortExprNodeCollection")
+ }
+
+ fn visit_map<V>(self, mut map: V) -> std::result::Result<PhysicalSortExprNodeCollection, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut physical_sort_expr_nodes__ = None;
+ while let Some(k) = map.next_key()? {
+ match k {
+ GeneratedField::PhysicalSortExprNodes => {
+ if physical_sort_expr_nodes__.is_some() {
+ return Err(serde::de::Error::duplicate_field("physicalSortExprNodes"));
+ }
+ physical_sort_expr_nodes__ = Some(map.next_value()?);
+ }
+ }
+ }
+ Ok(PhysicalSortExprNodeCollection {
+ physical_sort_expr_nodes: physical_sort_expr_nodes__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.PhysicalSortExprNodeCollection", FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for PhysicalTryCastNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 7e48db10f3..bc8987b1d0 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -129,6 +129,12 @@ pub struct ParquetFormat {}
pub struct AvroFormat {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LogicalExprNodeCollection {
+ #[prost(message, repeated, tag = "1")]
+ pub logical_expr_nodes: ::prost::alloc::vec::Vec<LogicalExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListingTableScanNode {
#[prost(message, optional, tag = "14")]
pub table_name: ::core::option::Option<OwnedTableReference>,
@@ -149,7 +155,7 @@ pub struct ListingTableScanNode {
#[prost(uint32, tag = "9")]
pub target_partitions: u32,
#[prost(message, repeated, tag = "13")]
- pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
#[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12")]
pub file_format_type: ::core::option::Option<
listing_table_scan_node::FileFormatType,
@@ -292,7 +298,7 @@ pub struct CreateExternalTableNode {
#[prost(string, tag = "10")]
pub file_compression_type: ::prost::alloc::string::String,
#[prost(message, repeated, tag = "13")]
- pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
#[prost(bool, tag = "14")]
pub unbounded: bool,
#[prost(map = "string, string", tag = "11")]
@@ -1688,6 +1694,12 @@ pub struct ScanLimit {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalSortExprNodeCollection {
+ #[prost(message, repeated, tag = "1")]
+ pub physical_sort_expr_nodes: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileScanExecConf {
#[prost(message, repeated, tag = "1")]
pub file_groups: ::prost::alloc::vec::Vec<FileGroup>,
@@ -1704,7 +1716,7 @@ pub struct FileScanExecConf {
#[prost(string, tag = "8")]
pub object_store_url: ::prost::alloc::string::String,
#[prost(message, repeated, tag = "9")]
- pub output_ordering: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
+ pub output_ordering: ::prost::alloc::vec::Vec<PhysicalSortExprNodeCollection>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 47107affb3..d1102faee3 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -17,7 +17,7 @@
use crate::common::{byte_to_string, proto_error, str_to_byte};
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
-use crate::protobuf::CustomTableScanNode;
+use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
use crate::{
convert_required,
protobuf::{
@@ -325,19 +325,15 @@ impl AsLogicalPlan for LogicalPlanNode {
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
- let file_sort_order = scan
- .file_sort_order
- .iter()
- .map(|expr| from_proto::parse_expr(expr, ctx))
- .collect::<Result<Vec<_>, _>>()?;
-
- // Protobuf doesn't distinguish between "not present"
- // and empty
- let file_sort_order = if file_sort_order.is_empty() {
- None
- } else {
- Some(file_sort_order)
- };
+ let mut all_sort_orders = vec![];
+ for order in &scan.file_sort_order {
+ let file_sort_order = order
+ .logical_expr_nodes
+ .iter()
+ .map(|expr| from_proto::parse_expr(expr, ctx))
+ .collect::<Result<Vec<_>, _>>()?;
+ all_sort_orders.push(file_sort_order)
+ }
let file_format: Arc<dyn FileFormat> =
match scan.file_format_type.as_ref().ok_or_else(|| {
@@ -384,7 +380,7 @@ impl AsLogicalPlan for LogicalPlanNode {
)
.with_collect_stat(scan.collect_stat)
.with_target_partitions(scan.target_partitions as usize)
- .with_file_sort_order(file_sort_order);
+ .with_file_sort_order(all_sort_orders);
let config =
ListingTableConfig::new_with_multi_paths(table_paths.clone())
@@ -505,11 +501,15 @@ impl AsLogicalPlan for LogicalPlanNode {
)))?
}
- let order_exprs = create_extern_table
- .order_exprs
- .iter()
- .map(|expr| from_proto::parse_expr(expr, ctx))
- .collect::<Result<Vec<Expr>, _>>()?;
+ let mut order_exprs = vec![];
+ for expr in &create_extern_table.order_exprs {
+ let order_expr = expr
+ .logical_expr_nodes
+ .iter()
+ .map(|expr| from_proto::parse_expr(expr, ctx))
+ .collect::<Result<Vec<Expr>, _>>()?;
+ order_exprs.push(order_expr)
+ }
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
@@ -849,15 +849,17 @@ impl AsLogicalPlan for LogicalPlanNode {
};
let options = listing_table.options();
- let file_sort_order =
- if let Some(file_sort_order) = &options.file_sort_order {
- file_sort_order
+
+ let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
+ for order in &options.file_sort_order {
+ let expr_vec = LogicalExprNodeCollection {
+ logical_expr_nodes: order
.iter()
.map(|expr| expr.try_into())
- .collect::<Result<Vec<protobuf::LogicalExprNode>, _>>()?
- } else {
- vec![]
+ .collect::<Result<Vec<_>, to_proto::Error>>()?,
};
+ exprs_vec.push(expr_vec);
+ }
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
@@ -880,7 +882,7 @@ impl AsLogicalPlan for LogicalPlanNode {
projection,
filters,
target_partitions: options.target_partitions as u32,
- file_sort_order,
+ file_sort_order: exprs_vec,
},
)),
})
@@ -1188,28 +1190,39 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded,
options,
},
- )) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
- protobuf::CreateExternalTableNode {
- name: Some(name.clone().into()),
- location: location.clone(),
- file_type: file_type.clone(),
- has_header: *has_header,
- schema: Some(df_schema.try_into()?),
- table_partition_cols: table_partition_cols.clone(),
- if_not_exists: *if_not_exists,
- delimiter: String::from(*delimiter),
- order_exprs: order_exprs
+ )) => {
+ let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
+ for order in order_exprs {
+ let temp = LogicalExprNodeCollection {
+ logical_expr_nodes: order
.iter()
.map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, to_proto::Error>>()?,
- definition: definition.clone().unwrap_or_default(),
- file_compression_type: file_compression_type.to_string(),
- unbounded: *unbounded,
- options: options.clone(),
- },
- )),
- }),
+ .collect::<Result<Vec<_>, to_proto::Error>>(
+ )?,
+ };
+ converted_order_exprs.push(temp);
+ }
+
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
+ protobuf::CreateExternalTableNode {
+ name: Some(name.clone().into()),
+ location: location.clone(),
+ file_type: file_type.clone(),
+ has_header: *has_header,
+ schema: Some(df_schema.try_into()?),
+ table_partition_cols: table_partition_cols.clone(),
+ if_not_exists: *if_not_exists,
+ delimiter: String::from(*delimiter),
+ order_exprs: converted_order_exprs,
+ definition: definition.clone().unwrap_or_default(),
+ file_compression_type: file_compression_type.to_string(),
+ unbounded: *unbounded,
+ options: options.clone(),
+ },
+ )),
+ })
+ }
LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name,
input,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index f8f55242a8..8af4815181 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -430,29 +430,28 @@ pub fn parse_protobuf_file_scan_config(
})
.collect::<Result<Vec<(String, DataType)>>>()?;
- let output_ordering = proto
- .output_ordering
- .iter()
- .map(|o| {
- let expr = o
- .expr
- .as_ref()
- .map(|e| parse_physical_expr(e.as_ref(), registry, &schema))
- .unwrap()?;
- Ok(PhysicalSortExpr {
- expr,
- options: SortOptions {
- descending: !o.asc,
- nulls_first: o.nulls_first,
- },
+ let mut output_ordering = vec![];
+ for node_collection in &proto.output_ordering {
+ let sort_expr = node_collection
+ .physical_sort_expr_nodes
+ .iter()
+ .map(|node| {
+ let expr = node
+ .expr
+ .as_ref()
+ .map(|e| parse_physical_expr(e.as_ref(), registry, &schema))
+ .unwrap()?;
+ Ok(PhysicalSortExpr {
+ expr,
+ options: SortOptions {
+ descending: !node.asc,
+ nulls_first: node.nulls_first,
+ },
+ })
})
- })
- .collect::<Result<Vec<PhysicalSortExpr>>>()?;
- let output_ordering = if output_ordering.is_empty() {
- None
- } else {
- Some(output_ordering)
- };
+ .collect::<Result<Vec<PhysicalSortExpr>>>()?;
+ output_ordering.push(sort_expr);
+ }
Ok(FileScanConfig {
object_store_url,
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 4c360dc378..7385a4ac21 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1627,7 +1627,7 @@ mod roundtrip_tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index f2f65b89f0..766906d04a 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -43,7 +43,10 @@ use datafusion::physical_plan::expressions::{
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use crate::protobuf;
-use crate::protobuf::{physical_aggregate_expr_node, PhysicalSortExprNode, ScalarValue};
+use crate::protobuf::{
+ physical_aggregate_expr_node, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
+ ScalarValue,
+};
use datafusion::logical_expr::BuiltinScalarFunction;
use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, GetIndexedFieldExpr};
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
@@ -481,21 +484,21 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
.map(|p| p.as_slice().try_into())
.collect::<Result<Vec<_>, _>>()?;
- let output_ordering = if let Some(output_ordering) = &conf.output_ordering {
- output_ordering
+ let mut output_orderings = vec![];
+ for order in &conf.output_ordering {
+ let expr_node_vec = order
.iter()
- .map(|o| {
- let expr = o.expr.clone().try_into()?;
+ .map(|sort_expr| {
+ let expr = sort_expr.expr.clone().try_into()?;
Ok(PhysicalSortExprNode {
expr: Some(Box::new(expr)),
- asc: !o.options.descending,
- nulls_first: o.options.nulls_first,
+ asc: !sort_expr.options.descending,
+ nulls_first: sort_expr.options.nulls_first,
})
})
- .collect::<Result<Vec<PhysicalSortExprNode>>>()?
- } else {
- vec![]
- };
+ .collect::<Result<Vec<PhysicalSortExprNode>>>()?;
+ output_orderings.push(expr_node_vec)
+ }
Ok(protobuf::FileScanExecConf {
file_groups,
@@ -515,7 +518,12 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
object_store_url: conf.object_store_url.to_string(),
- output_ordering,
+ output_ordering: output_orderings
+ .into_iter()
+ .map(|e| PhysicalSortExprNodeCollection {
+ physical_sort_expr_nodes: e,
+ })
+ .collect::<Vec<_>>(),
})
}
}
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index a0e928910d..38dacf35be 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -116,6 +116,9 @@ impl fmt::Display for CopyToSource {
}
}
+/// This type defines a lexicographical ordering.
+pub(crate) type LexOrdering = Vec<OrderByExpr>;
+
/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
///
/// Syntax:
@@ -158,7 +161,7 @@ pub struct CreateExternalTable {
/// Partition Columns
pub table_partition_cols: Vec<String>,
/// Ordered expressions
- pub order_exprs: Vec<OrderByExpr>,
+ pub order_exprs: Vec<LexOrdering>,
/// Option to not error if table already exists
pub if_not_exists: bool,
/// File compression type (GZIP, BZIP2, XZ)
@@ -585,7 +588,7 @@ impl<'a> DFParser<'a> {
delimiter: Option<char>,
file_compression_type: Option<CompressionTypeVariant>,
table_partition_cols: Option<Vec<String>>,
- order_exprs: Option<Vec<OrderByExpr>>,
+ order_exprs: Vec<LexOrdering>,
options: Option<HashMap<String, String>>,
}
let mut builder = Builder::default();
@@ -621,8 +624,7 @@ impl<'a> DFParser<'a> {
}
Keyword::WITH => {
if self.parser.parse_keyword(Keyword::ORDER) {
- ensure_not_set(&builder.order_exprs, "WITH ORDER")?;
- builder.order_exprs = Some(self.parse_order_by_exprs()?);
+ builder.order_exprs.push(self.parse_order_by_exprs()?);
} else {
self.parser.expect_keyword(Keyword::HEADER)?;
self.parser.expect_keyword(Keyword::ROW)?;
@@ -689,7 +691,7 @@ impl<'a> DFParser<'a> {
delimiter: builder.delimiter.unwrap_or(','),
location: builder.location.unwrap(),
table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]),
- order_exprs: builder.order_exprs.unwrap_or(vec![]),
+ order_exprs: builder.order_exprs,
if_not_exists,
file_compression_type: builder
.file_compression_type
@@ -1124,14 +1126,14 @@ mod tests {
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
- order_exprs: vec![OrderByExpr {
+ order_exprs: vec![vec![OrderByExpr {
expr: Identifier(Ident {
value: "c1".to_owned(),
quote_style: None,
}),
asc,
nulls_first,
- }],
+ }]],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
unbounded: false,
@@ -1154,7 +1156,7 @@ mod tests {
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
- order_exprs: vec![
+ order_exprs: vec![vec![
OrderByExpr {
expr: Identifier(Ident {
value: "c1".to_owned(),
@@ -1171,7 +1173,7 @@ mod tests {
asc: Some(false),
nulls_first: Some(true),
},
- ],
+ ]],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
unbounded: false,
@@ -1193,7 +1195,7 @@ mod tests {
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
- order_exprs: vec![OrderByExpr {
+ order_exprs: vec![vec![OrderByExpr {
expr: Expr::BinaryOp {
left: Box::new(Identifier(Ident {
value: "c1".to_owned(),
@@ -1207,7 +1209,7 @@ mod tests {
},
asc: Some(true),
nulls_first: None,
- }],
+ }]],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
unbounded: false,
@@ -1238,7 +1240,7 @@ mod tests {
delimiter: '*',
location: "foo.parquet".into(),
table_partition_cols: vec!["c1".into()],
- order_exprs: vec![OrderByExpr {
+ order_exprs: vec![vec![OrderByExpr {
expr: Expr::BinaryOp {
left: Box::new(Identifier(Ident {
value: "c1".to_owned(),
@@ -1252,7 +1254,7 @@ mod tests {
},
asc: Some(true),
nulls_first: None,
- }],
+ }]],
if_not_exists: true,
file_compression_type: CompressionTypeVariant::ZSTD,
unbounded: true,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index dada4b147b..7f914c6b91 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::parser::{
- CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
+ CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, LexOrdering,
Statement as DFStatement,
};
use crate::planner::{
@@ -44,9 +44,9 @@ use datafusion_expr::{
};
use sqlparser::ast;
use sqlparser::ast::{
- Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
- SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
- TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
+ Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, Query, SchemaName,
+ SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableConstraint,
+ TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
};
use datafusion_expr::expr::Placeholder;
@@ -578,10 +578,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn build_order_by(
&self,
- order_exprs: Vec<OrderByExpr>,
+ order_exprs: Vec<LexOrdering>,
schema: &DFSchemaRef,
planner_context: &mut PlannerContext,
- ) -> Result<Vec<datafusion_expr::Expr>> {
+ ) -> Result<Vec<Vec<datafusion_expr::Expr>>> {
// Ask user to provide a schema if schema is empty.
if !order_exprs.is_empty() && schema.fields().is_empty() {
return Err(DataFusionError::Plan(
@@ -589,22 +589,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.to_owned(),
));
}
- // Convert each OrderByExpr to a SortExpr:
- let result = self.order_by_to_sort_expr(&order_exprs, schema, planner_context)?;
- // Verify that columns of all SortExprs exist in the schema:
- for expr in result.iter() {
- for column in expr.to_columns()?.iter() {
- if !schema.has_column(column) {
- // Return an error if any column is not in the schema:
- return Err(DataFusionError::Plan(format!(
- "Column {column} is not in schema"
- )));
+
+ let mut all_results = vec![];
+ for expr in order_exprs {
+ // Convert each OrderByExpr to a SortExpr:
+ let expr_vec = self.order_by_to_sort_expr(&expr, schema, planner_context)?;
+ // Verify that columns of all SortExprs exist in the schema:
+ for expr in expr_vec.iter() {
+ for column in expr.to_columns()?.iter() {
+ if !schema.has_column(column) {
+ // Return an error if any column is not in the schema:
+ return Err(DataFusionError::Plan(format!(
+ "Column {column} is not in schema"
+ )));
+ }
}
}
+ // If all SortExprs are valid, return them as an expression vector
+ all_results.push(expr_vec)
}
-
- // If all SortExprs are valid, return them as an expression vector
- Ok(result)
+ Ok(all_results)
}
/// Generate a logical plan from a CREATE EXTERNAL TABLE statement
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs
index 0c65beb4e1..6273e0e589 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -112,7 +112,7 @@ pub async fn from_substrait_rel(
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs
index d213ccf187..ab77f19ea0 100644
--- a/datafusion/substrait/tests/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs
@@ -48,7 +48,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
- output_ordering: None,
+ output_ordering: vec![],
infinite_source: false,
};
let parquet_exec: Arc<dyn ExecutionPlan> =