You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/14 17:12:05 UTC
[arrow-datafusion] branch master updated: Add ability to specify external sort information for ListingTables (#4170)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d2814c960 Add ability to specify external sort information for ListingTables (#4170)
d2814c960 is described below
commit d2814c960168b45c4a0f5d7bbb72d9f412cb08bd
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Nov 14 12:11:59 2022 -0500
Add ability to specify external sort information for ListingTables (#4170)
* Add ability to specify external sort information for ParquetExec
* Updates
* Allow any listing table to specify its output order
* Start writing tests
* more tests
* Add doc comments on multple partitions and sort order
* Add tests for multiple files in a partition
* Ignore sort order on parquet exec if it has more than one file
* Add for other listing tables
* fmt
---
benchmarks/src/bin/tpch.rs | 1 +
.../examples/parquet_sql_multiple_files.rs | 1 +
datafusion/core/src/datasource/file_format/mod.rs | 1 +
datafusion/core/src/datasource/listing/table.rs | 166 ++++++++++++++++++++-
.../core/src/datasource/listing_table_factory.rs | 1 +
datafusion/core/src/execution/context.rs | 5 +-
datafusion/core/src/execution/options.rs | 4 +
.../core/src/physical_optimizer/enforcement.rs | 1 +
.../core/src/physical_optimizer/repartition.rs | 1 +
.../core/src/physical_plan/file_format/avro.rs | 7 +-
.../core/src/physical_plan/file_format/csv.rs | 5 +-
.../src/physical_plan/file_format/file_stream.rs | 1 +
.../core/src/physical_plan/file_format/json.rs | 7 +-
.../core/src/physical_plan/file_format/mod.rs | 81 +++++++++-
.../core/src/physical_plan/file_format/parquet.rs | 63 +++++---
datafusion/core/src/test/mod.rs | 1 +
datafusion/core/tests/custom_parquet_reader.rs | 1 +
datafusion/core/tests/parquet/page_pruning.rs | 1 +
datafusion/core/tests/row.rs | 1 +
datafusion/core/tests/sql/parquet.rs | 99 ++++++++++++
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 18 +++
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/logical_plan.rs | 43 ++++--
parquet-test-utils/src/lib.rs | 1 +
25 files changed, 473 insertions(+), 40 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 838709cf0..4517016af 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -397,6 +397,7 @@ async fn get_table(
target_partitions,
collect_stat: ctx.config.collect_statistics,
table_partition_cols: vec![],
+ file_sort_order: None,
};
let table_path = ListingTableUrl::parse(path)?;
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 6004ce67d..6e8ef2397 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -39,6 +39,7 @@ async fn main() -> Result<()> {
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
+ file_sort_order: None,
};
// Register a listing table - this will use all files in the directory as data sources
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 6775117e2..507e437a4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -124,6 +124,7 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
&[],
)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 05f560f20..ab08d6f18 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,9 +20,11 @@
use std::str::FromStr;
use std::{any::Any, sync::Arc};
+use arrow::compute::SortOptions;
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
+use datafusion_physical_expr::PhysicalSortExpr;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -38,6 +40,7 @@ use crate::datasource::{
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
+use crate::physical_plan;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
@@ -54,6 +57,7 @@ use super::PartitionedFile;
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
/// Configuration for creating a 'ListingTable'
+#[derive(Debug, Clone)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating `ListingTable`.
/// They should share the same schema and object store.
@@ -162,6 +166,7 @@ impl ListingTableConfig {
file_extension,
target_partitions: ctx.config.target_partitions,
table_partition_cols: vec![],
+ file_sort_order: None,
};
Ok(Self {
@@ -198,7 +203,7 @@ impl ListingTableConfig {
}
/// Options for creating a `ListingTable`
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct ListingOptions {
/// A suffix on which files should be filtered (leave empty to
/// keep all files on the path)
@@ -220,6 +225,16 @@ pub struct ListingOptions {
/// Group files to avoid that the number of partitions exceeds
/// this limit
pub target_partitions: usize,
+ /// Optional pre-known sort order. Must be `SortExpr`s.
+ ///
+ /// DataFusion may take advantage of this ordering to omit sorts
+ /// or use more efficient algorithms. Currently sortedness must be
+ /// provided if it is known by some external mechanism, but may in
+ /// the future be automatically determined, for example using
+ /// parquet metadata.
+ ///
+ /// See <https://github.com/apache/arrow-datafusion/issues/4177>
+ pub file_sort_order: Option<Vec<Expr>>,
}
impl ListingOptions {
@@ -236,6 +251,7 @@ impl ListingOptions {
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
+ file_sort_order: None,
}
}
@@ -361,6 +377,46 @@ impl ListingTable {
pub fn options(&self) -> &ListingOptions {
&self.options
}
+
+ /// If file_sort_order is specified, creates the appropriate physical expressions
+ fn try_create_output_ordering(&self) -> Result<Option<Vec<PhysicalSortExpr>>> {
+ let file_sort_order =
+ if let Some(file_sort_order) = self.options.file_sort_order.as_ref() {
+ file_sort_order
+ } else {
+ return Ok(None);
+ };
+
+ // convert each expr to a physical sort expr
+ let sort_exprs = file_sort_order
+ .iter()
+ .map(|expr| {
+ if let Expr::Sort { expr, asc, nulls_first } = expr {
+ if let Expr::Column(col) = expr.as_ref() {
+ let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
+ Ok(PhysicalSortExpr {
+ expr,
+ options: SortOptions {
+ descending: !asc,
+ nulls_first: *nulls_first,
+ },
+ })
+ }
+ else {
+ Err(DataFusionError::Plan(
+ format!("Only support single column references in output_ordering, got {:?}", expr)
+ ))
+ }
+ } else {
+ Err(DataFusionError::Plan(
+ format!("Expected Expr::Sort in output_ordering, but got {:?}", expr)
+ ))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Some(sort_exprs))
+ }
}
#[async_trait]
@@ -405,6 +461,7 @@ impl TableProvider for ListingTable {
statistics,
projection: projection.clone(),
limit,
+ output_ordering: self.try_create_output_ordering()?,
table_partition_cols: self.options.table_partition_cols.clone(),
config_options: ctx.config.config_options(),
},
@@ -507,6 +564,7 @@ mod tests {
};
use arrow::datatypes::DataType;
use chrono::DateTime;
+ use datafusion_common::assert_contains;
use super::*;
@@ -554,6 +612,109 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_try_create_output_ordering() {
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+ let table_path = ListingTableUrl::parse(filename).unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+ let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
+ let schema = options.infer_schema(&state, &table_path).await.unwrap();
+
+ use physical_plan::expressions::col as physical_col;
+ use std::ops::Add;
+
+ // (file_sort_order, expected_result)
+ let cases = vec![
+ (None, Ok(None)),
+ // empty list
+ (Some(vec![]), Ok(Some(vec![]))),
+ // not a sort expr
+ (
+ Some(vec![col("string_col")]),
+ Err("Expected Expr::Sort in output_ordering, but got string_col"),
+ ),
+ // sort expr, but non column
+ (
+ Some(vec![
+ col("int_col").add(lit(1)).sort(true, true),
+ ]),
+ Err("Only support single column references in output_ordering, got int_col + Int32(1)"),
+ ),
+ // ok with one column
+ (
+ Some(vec![col("string_col").sort(true, false)]),
+ Ok(Some(vec![PhysicalSortExpr {
+ expr: physical_col("string_col", &schema).unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ }]))
+
+ ),
+ // ok with two columns, different options
+ (
+ Some(vec![
+ col("string_col").sort(true, false),
+ col("int_col").sort(false, true),
+ ]),
+ Ok(Some(vec![
+ PhysicalSortExpr {
+ expr: physical_col("string_col", &schema).unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ },
+ PhysicalSortExpr {
+ expr: physical_col("int_col", &schema).unwrap(),
+ options: SortOptions {
+ descending: true,
+ nulls_first: true,
+ },
+ },
+ ]))
+
+ ),
+
+ ];
+
+ for (file_sort_order, expected_result) in cases {
+ let options = ListingOptions {
+ file_sort_order,
+ ..options.clone()
+ };
+ let config = ListingTableConfig::new(table_path.clone())
+ .with_listing_options(options)
+ .with_schema(schema.clone());
+
+ let table =
+ ListingTable::try_new(config.clone()).expect("Creating the table");
+ let ordering_result = table.try_create_output_ordering();
+
+ match (expected_result, ordering_result) {
+ (Ok(expected), Ok(result)) => {
+ assert_eq!(expected, result);
+ }
+ (Err(expected), Err(result)) => {
+ // can't compare the DataFusionError directly
+ let result = result.to_string();
+ let expected = expected.to_string();
+ assert_contains!(result.to_string(), expected);
+ }
+ (expected_result, ordering_result) => {
+ panic!(
+ "expected: {:#?}\n\nactual:{:#?}",
+ expected_result, ordering_result
+ );
+ }
+ }
+ }
+ }
+
#[tokio::test]
async fn read_empty_table() -> Result<()> {
let ctx = SessionContext::new();
@@ -566,6 +727,7 @@ mod tests {
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
collect_stat: true,
+ file_sort_order: None,
};
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
@@ -768,6 +930,7 @@ mod tests {
table_partition_cols: vec![],
target_partitions,
collect_stat: true,
+ file_sort_order: None,
};
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
@@ -805,6 +968,7 @@ mod tests {
table_partition_cols: vec![],
target_partitions,
collect_stat: true,
+ file_sort_order: None,
};
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 253172738..f57ffa9db 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -67,6 +67,7 @@ impl TableProviderFactory for ListingTableFactory {
file_extension: file_extension.to_owned(),
target_partitions: 1,
table_partition_cols: vec![],
+ file_sort_order: None,
};
let table_path = ListingTableUrl::parse(&cmd.location)?;
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ce88b9f20..8b7bf1ffb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -564,6 +564,7 @@ impl SessionContext {
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: cmd.table_partition_cols.clone(),
+ file_sort_order: None,
};
self.register_listing_table(
cmd.name.as_str(),
@@ -807,7 +808,7 @@ impl SessionContext {
table_path: impl AsRef<str>,
options: ListingOptions,
provided_schema: Option<SchemaRef>,
- sql: Option<String>,
+ sql_definition: Option<String>,
) -> Result<()> {
let table_path = ListingTableUrl::parse(table_path)?;
let resolved_schema = match provided_schema {
@@ -817,7 +818,7 @@ impl SessionContext {
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
- let table = ListingTable::try_new(config)?.with_definition(sql);
+ let table = ListingTable::try_new(config)?.with_definition(sql_definition);
self.register_table(name, Arc::new(table))?;
Ok(())
}
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index 150a20670..041639f0f 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -151,6 +151,7 @@ impl<'a> CsvReadOptions<'a> {
file_extension: self.file_extension.to_owned(),
target_partitions,
table_partition_cols: self.table_partition_cols.clone(),
+ file_sort_order: None,
}
}
}
@@ -225,6 +226,7 @@ impl<'a> ParquetReadOptions<'a> {
file_extension: self.file_extension.to_owned(),
target_partitions,
table_partition_cols: self.table_partition_cols.clone(),
+ file_sort_order: None,
}
}
}
@@ -274,6 +276,7 @@ impl<'a> AvroReadOptions<'a> {
file_extension: self.file_extension.to_owned(),
target_partitions,
table_partition_cols: self.table_partition_cols.clone(),
+ file_sort_order: None,
}
}
}
@@ -346,6 +349,7 @@ impl<'a> NdJsonReadOptions<'a> {
file_extension: self.file_extension.to_owned(),
target_partitions,
table_partition_cols: self.table_partition_cols.clone(),
+ file_sort_order: None,
}
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 21931397f..13f6d0bfa 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1069,6 +1069,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 839908d06..f88ef4ab0 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -271,6 +271,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 38178a976..59faac8fa 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use std::any::Any;
use std::sync::Arc;
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
@@ -73,7 +73,7 @@ impl ExecutionPlan for AvroExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ get_output_ordering(&self.base_config)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -235,6 +235,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -305,6 +306,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -374,6 +376,7 @@ mod tests {
limit: None,
table_partition_cols: vec!["date".to_owned()],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index faa1401d2..8b50755f4 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -43,7 +43,7 @@ use std::path::Path;
use std::sync::Arc;
use tokio::task::{self, JoinHandle};
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
/// Execution plan for scanning a CSV file
#[derive(Debug, Clone)]
@@ -109,8 +109,9 @@ impl ExecutionPlan for CsvExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
+ /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ get_output_ordering(&self.base_config)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index df12f3105..063238a6d 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -368,6 +368,7 @@ mod tests {
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
};
let file_stream = FileStream::new(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 409c53639..a2eb54920 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -43,7 +43,7 @@ use std::path::Path;
use std::sync::Arc;
use tokio::task::{self, JoinHandle};
-use super::FileScanConfig;
+use super::{get_output_ordering, FileScanConfig};
/// Execution plan for scanning NdJson data source
#[derive(Debug, Clone)]
@@ -88,7 +88,7 @@ impl ExecutionPlan for NdJsonExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ get_output_ordering(&self.base_config)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -328,6 +328,7 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
file_compression_type.to_owned(),
);
@@ -404,6 +405,7 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
file_compression_type.to_owned(),
);
@@ -450,6 +452,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 11d0b9fad..fbeab4cbf 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,6 +38,7 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
+use datafusion_physical_expr::PhysicalSortExpr;
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
@@ -52,7 +53,7 @@ use crate::{
use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
use lazy_static::lazy_static;
-use log::info;
+use log::{debug, info};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::{
@@ -100,6 +101,8 @@ pub struct FileScanConfig {
pub limit: Option<usize>,
/// The partitioning column names
pub table_partition_cols: Vec<String>,
+ /// The order in which the data is sorted, if known.
+ pub output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Configuration options passed to the physical plans
pub config_options: Arc<RwLock<ConfigOptions>>,
}
@@ -445,6 +448,81 @@ impl From<ObjectMeta> for FileMeta {
}
}
+/// The various listing tables does not attempt to read all files
+/// concurrently, instead they will read files in sequence within a
+/// partition. This is an important property as it allows plans to
+/// run against 1000s of files and not try to open them all
+/// concurrently.
+///
+/// However, it means if we assign more than one file to a partitition
+/// the output sort order will not be preserved as illustrated in the
+/// following diagrams:
+///
+/// When only 1 file is assigned to each partition, each partition is
+/// correctly sorted on `(A, B, C)`
+///
+/// ```text
+///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃
+/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │
+///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃
+/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │
+///┃ │ │ ┃
+/// │ │ │ │ │ │
+///┃ │ │ ┃
+/// │ │ │ │ │ │
+///┃ │ │ ┃
+/// │ │ │ │ │ │
+///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
+/// DataFusion DataFusion DataFusion DataFusion
+///┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃
+/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
+///
+/// ParquetExec
+///```
+///
+/// However, when more than 1 file is assigned to each partition, each
+/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
+/// file is scanned, the same values for A, B and C can be repeated in
+/// the same sorted stream
+///
+///```text
+///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
+///┃ ┌───────────────┐ ┌──────────────┐ │
+/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃
+///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │
+/// │ └───────────────┘ │ │ └──────────────┘ ┃
+///┃ ┌───────────────┐ ┌──────────────┐ │
+/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃
+///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │
+/// │ └───────────────┘ │ │ └──────────────┘ ┃
+///┃ │
+/// │ │ │ ┃
+///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+/// DataFusion DataFusion ┃
+///┃ Partition 1 Partition 2
+/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
+///
+/// ParquetExec
+///```
+pub(crate) fn get_output_ordering(
+ base_config: &FileScanConfig,
+) -> Option<&[PhysicalSortExpr]> {
+ if let Some(output_ordering) = base_config.output_ordering.as_ref() {
+ if base_config.file_groups.iter().any(|group| group.len() > 1) {
+ debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}",
+ output_ordering, base_config.file_groups);
+ None
+ } else {
+ Some(output_ordering)
+ }
+ } else {
+ None
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::{
@@ -710,6 +788,7 @@ mod tests {
statistics,
table_partition_cols,
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 7a44749c8..fa65d7656 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -65,6 +65,8 @@ mod row_groups;
pub use metrics::ParquetFileMetrics;
+use super::get_output_ordering;
+
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -236,7 +238,7 @@ impl ExecutionPlan for ParquetExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ get_output_ordering(&self.base_config)
}
fn with_new_children(
@@ -301,24 +303,29 @@ impl ExecutionPlan for ParquetExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- if let Some(pre) = &self.pruning_predicate {
- write!(
- f,
- "ParquetExec: limit={:?}, partitions={}, predicate={}, projection={}",
- self.base_config.limit,
- super::FileGroupsDisplay(&self.base_config.file_groups),
- pre.predicate_expr(),
- super::ProjectSchemaDisplay(&self.projected_schema),
- )
- } else {
- write!(
- f,
- "ParquetExec: limit={:?}, partitions={}, projection={}",
- self.base_config.limit,
- super::FileGroupsDisplay(&self.base_config.file_groups),
- super::ProjectSchemaDisplay(&self.projected_schema),
- )
- }
+ let pruning_predicate_string = self
+ .pruning_predicate
+ .as_ref()
+ // TODO change this to be pruning_predicate rather than 'predicate'
+ // to avoid confusion
+ // https://github.com/apache/arrow-datafusion/issues/4020
+ .map(|pre| format!(", predicate={}", pre.predicate_expr()))
+ .unwrap_or_default();
+
+ let output_ordering_string = self
+ .output_ordering()
+ .map(make_output_ordering_string)
+ .unwrap_or_default();
+
+ write!(
+ f,
+ "ParquetExec: limit={:?}, partitions={}{}{}, projection={}",
+ self.base_config.limit,
+ super::FileGroupsDisplay(&self.base_config.file_groups),
+ pruning_predicate_string,
+ output_ordering_string,
+ super::ProjectSchemaDisplay(&self.projected_schema),
+ )
}
}
}
@@ -332,6 +339,20 @@ impl ExecutionPlan for ParquetExec {
}
}
+fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String {
+ use std::fmt::Write;
+ let mut w: String = ", output_ordering=[".into();
+
+ for (i, e) in ordering.iter().enumerate() {
+ if i > 0 {
+ write!(&mut w, ", ").unwrap()
+ }
+ write!(&mut w, "{}", e).unwrap()
+ }
+ write!(&mut w, "]").unwrap();
+ w
+}
+
/// Implements [`FormatReader`] for a parquet file
struct ParquetOpener {
partition_index: usize,
@@ -731,6 +752,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
predicate,
None,
@@ -1260,6 +1282,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
@@ -1362,6 +1385,7 @@ mod tests {
"day".to_owned(),
],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
@@ -1421,6 +1445,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index be27b2596..e36a46b6d 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -167,6 +167,7 @@ pub fn partitioned_csv_config(
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
})
}
diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
index ded5fad02..b48061331 100644
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -90,6 +90,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
None,
None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 2a8791b69..8d8c3bcae 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -70,6 +70,7 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
Some(filter),
None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 630c28a10..6567c8e75 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -107,6 +107,7 @@ async fn get_exec(
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
},
&[],
)
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 7a0db41f1..770ebd973 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -18,6 +18,7 @@
use std::{fs, path::Path};
use ::parquet::arrow::ArrowWriter;
+use datafusion::datasource::listing::ListingOptions;
use tempfile::TempDir;
use super::*;
@@ -48,6 +49,104 @@ async fn parquet_query() {
assert_batches_eq!(expected, &actual);
}
+#[tokio::test]
+/// Test that if sort order is specified in ListingOptions, the sort
+/// expressions make it all the way down to the ParquetExec
+async fn parquet_with_sort_order_specified() {
+ let parquet_read_options = ParquetReadOptions::default();
+ let target_partitions = 2;
+
+ // The sort order is not specified
+ let options_no_sort = ListingOptions {
+ file_sort_order: None,
+ ..parquet_read_options.to_listing_options(target_partitions)
+ };
+
+ // The sort order is specified (not actually correct in this case)
+ let file_sort_order = [col("string_col"), col("int_col")]
+ .into_iter()
+ .map(|e| {
+ let ascending = true;
+ let nulls_first = false;
+ e.sort(ascending, nulls_first)
+ })
+ .collect::<Vec<_>>();
+
+ let options_sort = ListingOptions {
+ file_sort_order: Some(file_sort_order),
+ ..parquet_read_options.to_listing_options(target_partitions)
+ };
+
+ // This string appears in ParquetExec if the output ordering is
+ // specified
+ let expected_output_ordering =
+ "output_ordering=[string_col@9 ASC NULLS LAST, int_col@4 ASC NULLS LAST]";
+
+ // when sort not specified, should not appear in the explain plan
+ let num_files = 1;
+ assert_not_contains!(
+ run_query_with_options(options_no_sort, num_files).await,
+ expected_output_ordering
+ );
+
+ // when sort IS specified, SHOULD appear in the explain plan
+ let num_files = 1;
+ assert_contains!(
+ run_query_with_options(options_sort.clone(), num_files).await,
+ expected_output_ordering
+ );
+
+ // when sort IS specified, but there are too many files (greater
+ // than the number of partitions) sort should not appear
+ let num_files = 3;
+ assert_not_contains!(
+ run_query_with_options(options_sort, num_files).await,
+ expected_output_ordering
+ );
+}
+
+/// Runs a limit query against a parquet file that was registered from
+/// options on num_files copies of all_types_plain.parquet
+async fn run_query_with_options(options: ListingOptions, num_files: usize) -> String {
+ let ctx = SessionContext::new();
+
+ let testdata = datafusion::test_util::parquet_test_data();
+ let file_path = format!("{}/alltypes_plain.parquet", testdata);
+
+ // Create a directory of parquet files with names
+ // 0.parquet
+ // 1.parquet
+ let tmpdir = TempDir::new().unwrap();
+ for i in 0..num_files {
+ let target_file = tmpdir.path().join(format!("{i}.parquet"));
+ println!("Copying {file_path} to {target_file:?}");
+ std::fs::copy(&file_path, target_file).unwrap();
+ }
+
+ let provided_schema = None;
+ let sql_definition = None;
+ ctx.register_listing_table(
+ "t",
+ tmpdir.path().to_string_lossy(),
+ options.clone(),
+ provided_schema,
+ sql_definition,
+ )
+ .await
+ .unwrap();
+
+ let batches = ctx.sql("explain select int_col, string_col from t order by string_col, int_col limit 10")
+ .await
+ .expect("planing worked")
+ .collect()
+ .await
+ .expect("execution worked");
+
+ arrow::util::pretty::pretty_format_batches(&batches)
+ .unwrap()
+ .to_string()
+}
+
#[tokio::test]
async fn fixed_size_binary_columns() {
let ctx = SessionContext::new();
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index de5f3749d..48716c558 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -109,6 +109,7 @@ message ListingTableScanNode {
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
+ repeated datafusion.LogicalExprNode file_sort_order = 13;
}
message ViewTableScanNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index fe67b590a..6f9636f6b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6883,6 +6883,9 @@ impl serde::Serialize for ListingTableScanNode {
if self.target_partitions != 0 {
len += 1;
}
+ if !self.file_sort_order.is_empty() {
+ len += 1;
+ }
if self.file_format_type.is_some() {
len += 1;
}
@@ -6914,6 +6917,9 @@ impl serde::Serialize for ListingTableScanNode {
if self.target_partitions != 0 {
struct_ser.serialize_field("targetPartitions", &self.target_partitions)?;
}
+ if !self.file_sort_order.is_empty() {
+ struct_ser.serialize_field("fileSortOrder", &self.file_sort_order)?;
+ }
if let Some(v) = self.file_format_type.as_ref() {
match v {
listing_table_scan_node::FileFormatType::Csv(v) => {
@@ -6951,6 +6957,8 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
"collectStat",
"target_partitions",
"targetPartitions",
+ "file_sort_order",
+ "fileSortOrder",
"csv",
"parquet",
"avro",
@@ -6967,6 +6975,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
TablePartitionCols,
CollectStat,
TargetPartitions,
+ FileSortOrder,
Csv,
Parquet,
Avro,
@@ -7000,6 +7009,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
"tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols),
"collectStat" | "collect_stat" => Ok(GeneratedField::CollectStat),
"targetPartitions" | "target_partitions" => Ok(GeneratedField::TargetPartitions),
+ "fileSortOrder" | "file_sort_order" => Ok(GeneratedField::FileSortOrder),
"csv" => Ok(GeneratedField::Csv),
"parquet" => Ok(GeneratedField::Parquet),
"avro" => Ok(GeneratedField::Avro),
@@ -7031,6 +7041,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
let mut table_partition_cols__ = None;
let mut collect_stat__ = None;
let mut target_partitions__ = None;
+ let mut file_sort_order__ = None;
let mut file_format_type__ = None;
while let Some(k) = map.next_key()? {
match k {
@@ -7090,6 +7101,12 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
+ GeneratedField::FileSortOrder => {
+ if file_sort_order__.is_some() {
+ return Err(serde::de::Error::duplicate_field("fileSortOrder"));
+ }
+ file_sort_order__ = Some(map.next_value()?);
+ }
GeneratedField::Csv => {
if file_format_type__.is_some() {
return Err(serde::de::Error::duplicate_field("csv"));
@@ -7123,6 +7140,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode {
table_partition_cols: table_partition_cols__.unwrap_or_default(),
collect_stat: collect_stat__.unwrap_or_default(),
target_partitions: target_partitions__.unwrap_or_default(),
+ file_sort_order: file_sort_order__.unwrap_or_default(),
file_format_type: file_format_type__,
})
}
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 8962a7a8a..c84f9a179 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -132,6 +132,8 @@ pub struct ListingTableScanNode {
pub collect_stat: bool,
#[prost(uint32, tag="9")]
pub target_partitions: u32,
+ #[prost(message, repeated, tag="13")]
+ pub file_sort_order: ::prost::alloc::vec::Vec<LogicalExprNode>,
#[prost(oneof="listing_table_scan_node::FileFormatType", tags="10, 11, 12")]
pub file_format_type: ::core::option::Option<listing_table_scan_node::FileFormatType>,
}
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index ba0ed5eb9..f3c4c678b 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -416,6 +416,20 @@ impl AsLogicalPlan for LogicalPlanNode {
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
+ let file_sort_order = scan
+ .file_sort_order
+ .iter()
+ .map(|expr| parse_expr(expr, ctx))
+ .collect::<Result<Vec<_>, _>>()?;
+
+ // Protobuf doesn't distinguish between "not present"
+ // and empty
+ let file_sort_order = if file_sort_order.is_empty() {
+ None
+ } else {
+ Some(file_sort_order)
+ };
+
let file_format: Arc<dyn FileFormat> =
match scan.file_format_type.as_ref().ok_or_else(|| {
proto_error(format!(
@@ -451,6 +465,7 @@ impl AsLogicalPlan for LogicalPlanNode {
table_partition_cols: scan.table_partition_cols.clone(),
collect_stat: scan.collect_stat,
target_partitions: scan.target_partitions as usize,
+ file_sort_order,
};
let config =
@@ -871,18 +886,26 @@ impl AsLogicalPlan for LogicalPlanNode {
listing_table.options().format
)));
};
+
+ let options = listing_table.options();
+ let file_sort_order =
+ if let Some(file_sort_order) = &options.file_sort_order {
+ file_sort_order
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<protobuf::LogicalExprNode>, _>>()?
+ } else {
+ vec![]
+ };
+
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
protobuf::ListingTableScanNode {
file_format_type: Some(file_format_type),
table_name: table_name.to_owned(),
- collect_stat: listing_table.options().collect_stat,
- file_extension: listing_table
- .options()
- .file_extension
- .clone(),
- table_partition_cols: listing_table
- .options()
+ collect_stat: options.collect_stat,
+ file_extension: options.file_extension.clone(),
+ table_partition_cols: options
.table_partition_cols
.clone(),
paths: listing_table
@@ -893,10 +916,8 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: Some(schema),
projection,
filters,
- target_partitions: listing_table
- .options()
- .target_partitions
- as u32,
+ target_partitions: options.target_partitions as u32,
+ file_sort_order,
},
)),
})
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 41066584e..f1d06c46a 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -149,6 +149,7 @@ impl TestParquetFile {
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
+ output_ordering: None,
};
let df_schema = self.schema.clone().to_dfschema_ref()?;