You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/06 11:41:33 UTC
[arrow-datafusion] branch master updated: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b7a33317c [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)
b7a33317c is described below
commit b7a33317c2abf265f4ab6b3fe636f87c4d01334c
Author: mingmwang <mi...@ebay.com>
AuthorDate: Sun Nov 6 19:41:27 2022 +0800
[Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043)
* [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement
* Fix hash join output_partitioning
* fix
* fix format
* Resolve review comments
* tiny fix
* UT to verify hash join output_partitioning
* fix comments
---
datafusion/core/src/dataframe.rs | 162 +++++++++++++
.../core/src/physical_optimizer/merge_exec.rs | 35 ++-
.../core/src/physical_plan/aggregates/mod.rs | 63 ++++-
datafusion/core/src/physical_plan/analyze.rs | 8 +-
.../core/src/physical_plan/coalesce_batches.rs | 8 +-
.../core/src/physical_plan/coalesce_partitions.rs | 8 +-
datafusion/core/src/physical_plan/empty.rs | 6 +-
datafusion/core/src/physical_plan/explain.rs | 4 -
.../core/src/physical_plan/file_format/avro.rs | 4 -
.../core/src/physical_plan/file_format/csv.rs | 4 -
.../core/src/physical_plan/file_format/json.rs | 4 -
.../core/src/physical_plan/file_format/parquet.rs | 4 -
datafusion/core/src/physical_plan/filter.rs | 91 +++++++-
.../core/src/physical_plan/joins/cross_join.rs | 28 ++-
.../core/src/physical_plan/joins/hash_join.rs | 90 ++++++--
.../src/physical_plan/joins/sort_merge_join.rs | 147 ++++++++++--
datafusion/core/src/physical_plan/joins/utils.rs | 129 ++++++++++-
datafusion/core/src/physical_plan/limit.rs | 24 +-
datafusion/core/src/physical_plan/memory.rs | 4 -
datafusion/core/src/physical_plan/mod.rs | 59 ++++-
datafusion/core/src/physical_plan/planner.rs | 40 ++--
datafusion/core/src/physical_plan/projection.rs | 80 ++++++-
datafusion/core/src/physical_plan/repartition.rs | 12 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 18 +-
.../physical_plan/sorts/sort_preserving_merge.rs | 13 +-
datafusion/core/src/physical_plan/union.rs | 179 ++++++++++++--
datafusion/core/src/physical_plan/values.rs | 13 +-
datafusion/core/src/physical_plan/windows/mod.rs | 4 +
.../src/physical_plan/windows/window_agg_exec.rs | 39 +++-
.../core/src/scheduler/pipeline/execution.rs | 4 +-
datafusion/core/tests/user_defined_plan.rs | 8 +-
datafusion/physical-expr/src/equivalence.rs | 256 +++++++++++++++++++++
datafusion/physical-expr/src/expressions/column.rs | 70 ++++++
datafusion/physical-expr/src/expressions/mod.rs | 2 +-
datafusion/physical-expr/src/lib.rs | 9 +
datafusion/physical-expr/src/utils.rs | 142 +++++++++++-
36 files changed, 1537 insertions(+), 234 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index aa5958fe6..3d1c8d009 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -842,6 +842,8 @@ mod tests {
use super::*;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
+ use crate::physical_plan::Partitioning;
+ use crate::physical_plan::PhysicalExpr;
use crate::test_util;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
@@ -851,6 +853,7 @@ mod tests {
avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction,
};
+ use datafusion_physical_expr::expressions::Column;
#[tokio::test]
async fn select_columns() -> Result<()> {
@@ -1515,4 +1518,163 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn partition_aware_union() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c3"])?
+ .with_column_renamed("c2.c1", "c2_c1")?;
+
+ let left_rows = left.collect().await?;
+ let right_rows = right.collect().await?;
+ let join1 =
+ left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+ let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+
+ let union = join1.union(join2)?;
+
+ let union_rows = union.collect().await?;
+
+ assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+ assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+ assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+ let physical_plan = union.create_physical_plan().await?;
+ let default_partition_count =
+ SessionContext::new().copied_config().target_partitions;
+
+ // For partition aware union, the output partition count should not be changed.
+ assert_eq!(
+ physical_plan.output_partitioning().partition_count(),
+ default_partition_count
+ );
+ // For partition aware union, the output partition is the same with the union's inputs
+ for child in physical_plan.children() {
+ assert_eq!(
+ physical_plan.output_partitioning(),
+ child.output_partitioning()
+ );
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn non_partition_aware_union() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c2"])?
+ .with_column_renamed("c2.c1", "c2_c1")?
+ .with_column_renamed("c2.c2", "c2_c2")?;
+
+ let left_rows = left.collect().await?;
+ let right_rows = right.collect().await?;
+ let join1 = left.join(
+ right.clone(),
+ JoinType::Inner,
+ &["c1", "c2"],
+ &["c2_c1", "c2_c2"],
+ None,
+ )?;
+
+ // join key ordering is different
+ let join2 = left.join(
+ right,
+ JoinType::Inner,
+ &["c2", "c1"],
+ &["c2_c2", "c2_c1"],
+ None,
+ )?;
+
+ let union = join1.union(join2)?;
+
+ let union_rows = union.collect().await?;
+
+ assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+ assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+ assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+ let physical_plan = union.create_physical_plan().await?;
+ let default_partition_count =
+ SessionContext::new().copied_config().target_partitions;
+
+ // For non-partition aware union, the output partitioning count should be the combination of all output partitions count
+ assert!(matches!(
+ physical_plan.output_partitioning(),
+ Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn verify_join_output_partitioning() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c2"])?
+ .with_column_renamed("c2.c1", "c2_c1")?
+ .with_column_renamed("c2.c2", "c2_c2")?;
+
+ let all_join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::LeftSemi,
+ JoinType::RightSemi,
+ JoinType::LeftAnti,
+ JoinType::RightAnti,
+ ];
+
+ let default_partition_count =
+ SessionContext::new().copied_config().target_partitions;
+
+ for join_type in all_join_types {
+ let join = left.join(
+ right.clone(),
+ join_type,
+ &["c1", "c2"],
+ &["c2_c1", "c2_c2"],
+ None,
+ )?;
+ let physical_plan = join.create_physical_plan().await?;
+ let out_partitioning = physical_plan.output_partitioning();
+ let join_schema = physical_plan.schema();
+
+ match join_type {
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => {
+ let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
+ Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
+ ];
+ assert_eq!(
+ out_partitioning,
+ Partitioning::Hash(left_exprs, default_partition_count)
+ );
+ }
+ JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
+ let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()),
+ Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()),
+ ];
+ assert_eq!(
+ out_partitioning,
+ Partitioning::Hash(right_exprs, default_partition_count)
+ );
+ }
+ JoinType::Full => {
+ assert!(matches!(
+ out_partitioning,
+ Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
+ }
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
index 960d0582f..1ed0f6196 100644
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/core/src/physical_optimizer/merge_exec.rs
@@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
.iter()
.map(|child| self.optimize(child.clone(), _config))
.collect::<Result<Vec<_>>>()?;
- match plan.required_child_distribution() {
- Distribution::UnspecifiedDistribution => {
- with_new_children_if_necessary(plan, children)
- }
- Distribution::HashPartitioned(_) => {
- with_new_children_if_necessary(plan, children)
- }
- Distribution::SinglePartition => with_new_children_if_necessary(
- plan,
- children
- .iter()
- .map(|child| {
- if child.output_partitioning().partition_count() == 1 {
- child.clone()
- } else {
- Arc::new(CoalescePartitionsExec::new(child.clone()))
- }
- })
- .collect(),
- ),
- }
+ assert_eq!(children.len(), plan.required_input_distribution().len());
+ let new_children = children
+ .into_iter()
+ .zip(plan.required_input_distribution())
+ .map(|(child, dist)| match dist {
+ Distribution::SinglePartition
+ if child.output_partitioning().partition_count() > 1 =>
+ {
+ Arc::new(CoalescePartitionsExec::new(child.clone()))
+ }
+ _ => child,
+ })
+ .collect::<Vec<_>>();
+ with_new_children_if_necessary(plan, new_children)
}
}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index e453b65cc..6fda8f77b 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -37,6 +37,7 @@ use datafusion_physical_expr::{
expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
};
use std::any::Any;
+use std::collections::HashMap;
use std::sync::Arc;
@@ -45,9 +46,11 @@ mod no_grouping;
mod row_hash;
use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
+use crate::physical_plan::EquivalenceProperties;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
+use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
use datafusion_row::{row_supported, RowType};
/// Hash aggregate modes
@@ -163,6 +166,9 @@ pub struct AggregateExec {
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
+ /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
+ /// The key is the column from the input schema and the values are the columns from the output schema
+ alias_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -186,6 +192,18 @@ impl AggregateExec {
let schema = Arc::new(schema);
+ let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+ for (expression, name) in group_by.expr.iter() {
+ if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+ let new_col_idx = schema.index_of(name)?;
+ // When the column name is the same, but index does not equal, treat it as Alias
+ if (column.name() != name) || (column.index() != new_col_idx) {
+ let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
+ entry.push(Column::new(name, new_col_idx));
+ }
+ };
+ }
+
Ok(AggregateExec {
mode,
group_by,
@@ -193,6 +211,7 @@ impl AggregateExec {
input,
schema,
input_schema,
+ alias_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ match &self.mode {
+ AggregateMode::Partial => {
+ // Partial Aggregation will not change the output partitioning but need to respect the Alias
+ let input_partition = self.input.output_partitioning();
+ match input_partition {
+ Partitioning::Hash(exprs, part) => {
+ let normalized_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ normalize_out_expr_with_alias_schema(
+ expr,
+ &self.alias_map,
+ &self.schema,
+ )
+ })
+ .collect::<Vec<_>>();
+ Partitioning::Hash(normalized_exprs, part)
+ }
+ _ => input_partition,
+ }
+ }
+ // Final Aggregation's output partitioning is the same as its real input
+ _ => self.input.output_partitioning(),
+ }
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
- fn required_child_distribution(&self) -> Distribution {
+ fn required_input_distribution(&self) -> Vec<Distribution> {
match &self.mode {
- AggregateMode::Partial => Distribution::UnspecifiedDistribution,
- AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
- self.group_by.expr.iter().map(|x| x.0.clone()).collect(),
- ),
- AggregateMode::Final => Distribution::SinglePartition,
+ AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
+ AggregateMode::FinalPartitioned => {
+ vec![Distribution::HashPartitioned(self.output_group_expr())]
+ }
+ AggregateMode::Final => vec![Distribution::SinglePartition],
}
}
- fn relies_on_input_order(&self) -> bool {
- false
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ let mut input_equivalence_properties = self.input.equivalence_properties();
+ input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
+ input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
+ input_equivalence_properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 8134ee7d2..b0578bd48 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec {
}
/// Specifies we want the input as a single stream
- fn required_child_distribution(&self) -> Distribution {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition]
}
/// Get the output partitioning of this plan
@@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 317500ddc..e7c492732 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -25,8 +25,8 @@ use std::task::{Context, Poll};
use crate::error::Result;
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream,
+ DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+ RecordBatchStream, SendableRecordBatchStream,
};
use crate::execution::context::TaskContext;
@@ -100,8 +100,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
}
fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index d1c797eac..816a9c940 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -33,7 +33,9 @@ use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+ DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+};
use super::SendableRecordBatchStream;
use crate::execution::context::TaskContext;
@@ -87,8 +89,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
}
fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index c693764c8..4751dade1 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
- memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning,
};
use arrow::array::NullArray;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -98,10 +98,6 @@ impl ExecutionPlan for EmptyExec {
vec![]
}
- fn required_child_distribution(&self) -> Distribution {
- Distribution::UnspecifiedDistribution
- }
-
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions)
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 15f459fb0..ac350b183 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -97,10 +97,6 @@ impl ExecutionPlan for ExplainExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 2aab84fad..38178a976 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -76,10 +76,6 @@ impl ExecutionPlan for AvroExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Vec::new()
}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index d086a7798..51180c0f0 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -109,10 +109,6 @@ impl ExecutionPlan for CsvExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index c8c5d71bd..ceb9e7958 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -91,10 +91,6 @@ impl ExecutionPlan for NdJsonExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Vec::new()
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index c803dabd0..1f98dd88c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -251,10 +251,6 @@ impl ExecutionPlan for ParquetExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index b4e3edaee..17a2355d0 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -28,13 +28,17 @@ use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
- DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+ Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+ PhysicalExpr,
};
use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
+use datafusion_expr::Operator;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::split_conjunction;
use log::debug;
@@ -113,8 +117,14 @@ impl ExecutionPlan for FilterExec {
true
}
- fn relies_on_input_order(&self) -> bool {
- false
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Combine the equal predicates with the input equivalence properties
+ let mut input_properties = self.input.equivalence_properties();
+ let (equal_pairs, _ne_pairs) = collect_columns_from_predicate(&self.predicate);
+ for new_condition in equal_pairs {
+ input_properties.add_equal_conditions(new_condition)
+ }
+ input_properties
}
fn with_new_children(
@@ -231,6 +241,38 @@ impl RecordBatchStream for FilterExecStream {
}
}
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
+ let mut eq_predicate_columns: Vec<(&Column, &Column)> = Vec::new();
+ let mut ne_predicate_columns: Vec<(&Column, &Column)> = Vec::new();
+
+ let predicates = split_conjunction(predicate);
+ predicates.into_iter().for_each(|p| {
+ if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ if left.as_any().is::<Column>() && right.as_any().is::<Column>() {
+ let left_column = left.as_any().downcast_ref::<Column>().unwrap();
+ let right_column = right.as_any().downcast_ref::<Column>().unwrap();
+ match binary.op() {
+ Operator::Eq => {
+ eq_predicate_columns.push((left_column, right_column))
+ }
+ Operator::NotEq => {
+ ne_predicate_columns.push((left_column, right_column))
+ }
+ _ => {}
+ }
+ }
+ }
+ });
+
+ (eq_predicate_columns, ne_predicate_columns)
+}
+/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
+pub type EqualAndNonEqual<'a> =
+ (Vec<(&'a Column, &'a Column)>, Vec<(&'a Column, &'a Column)>);
+
#[cfg(test)]
mod tests {
@@ -295,4 +337,47 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn collect_columns_predicates() -> Result<()> {
+ let schema = test_util::aggr_test_schema();
+ let predicate: Arc<dyn PhysicalExpr> = binary(
+ binary(
+ binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
+ Operator::And,
+ binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
+ &schema,
+ )?,
+ Operator::And,
+ binary(
+ binary(
+ col("c2", &schema)?,
+ Operator::Eq,
+ col("c9", &schema)?,
+ &schema,
+ )?,
+ Operator::And,
+ binary(
+ col("c1", &schema)?,
+ Operator::NotEq,
+ col("c13", &schema)?,
+ &schema,
+ )?,
+ &schema,
+ )?,
+ &schema,
+ )?;
+
+ let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
+
+ assert_eq!(1, equal_pairs.len());
+ assert_eq!(equal_pairs[0].0.name(), "c2");
+ assert_eq!(equal_pairs[0].1.name(), "c9");
+
+ assert_eq!(1, ne_pairs.len());
+ assert_eq!(ne_pairs[0].0.name(), "c1");
+ assert_eq!(ne_pairs[0].1.name(), "c13");
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 7a35116a4..a71e06cce 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -29,16 +29,19 @@ use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
- ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
+ Partitioning, PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream,
+ Statistics,
};
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
-use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;
use std::time::Instant;
-use super::utils::{check_join_is_valid, OnceAsync, OnceFut};
+use super::utils::{
+ adjust_right_output_partitioning, check_join_is_valid,
+ cross_join_equivalence_properties, OnceAsync, OnceFut,
+};
/// Data of the left side
type JoinLeftData = RecordBatch;
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
)?))
}
+ // TODO optimize CrossJoin implementation to generate M * N partitions
fn output_partitioning(&self) -> Partitioning {
- self.right.output_partitioning()
+ let left_columns_len = self.left.schema().fields.len();
+ adjust_right_output_partitioning(
+ self.right.output_partitioning(),
+ left_columns_len,
+ )
}
+ // TODO check the output ordering of CrossJoin
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ let left_columns_len = self.left.schema().fields.len();
+ cross_join_equivalence_properties(
+ self.left.equivalence_properties(),
+ self.right.equivalence_properties(),
+ left_columns_len,
+ )
}
fn execute(
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 839a24112..07b4d6f4a 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -62,12 +62,13 @@ use crate::physical_plan::{
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
joins::utils::{
- build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
- JoinFilter, JoinOn, JoinSide,
+ adjust_right_output_partitioning, build_join_schema, check_join_is_valid,
+ combine_join_equivalence_properties, estimate_join_statistics,
+ partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, JoinSide,
},
metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
+ PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::error::{DataFusionError, Result};
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
self.schema.clone()
}
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ match self.mode {
+ PartitionMode::CollectLeft => vec![
+ Distribution::SinglePartition,
+ Distribution::UnspecifiedDistribution,
+ ],
+ PartitionMode::Partitioned => {
+ let (left_expr, right_expr) = self
+ .on
+ .iter()
+ .map(|(l, r)| {
+ (
+ Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+ Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+ )
+ })
+ .unzip();
+ vec![
+ Distribution::HashPartitioned(left_expr),
+ Distribution::HashPartitioned(right_expr),
+ ]
+ }
+ }
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ let left_columns_len = self.left.schema().fields.len();
+ match self.mode {
+ PartitionMode::CollectLeft => match self.join_type {
+ JoinType::Inner | JoinType::Right => adjust_right_output_partitioning(
+ self.right.output_partitioning(),
+ left_columns_len,
+ ),
+ JoinType::RightSemi | JoinType::RightAnti => {
+ self.right.output_partitioning()
+ }
+ JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti
+ | JoinType::Full => Partitioning::UnknownPartitioning(
+ self.right.output_partitioning().partition_count(),
+ ),
+ },
+ PartitionMode::Partitioned => partitioned_join_output_partitioning(
+ self.join_type,
+ self.left.output_partitioning(),
+ self.right.output_partitioning(),
+ left_columns_len,
+ ),
+ }
+ }
+
+ // TODO Output ordering might be kept for some cases.
+ // For example if it is inner join then the stream side order can be kept
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ let left_columns_len = self.left.schema().fields.len();
+ combine_join_equivalence_properties(
+ self.join_type,
+ self.left.equivalence_properties(),
+ self.right.equivalence_properties(),
+ left_columns_len,
+ self.on(),
+ )
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.left.clone(), self.right.clone()]
}
@@ -289,18 +359,6 @@ impl ExecutionPlan for HashJoinExec {
)?))
}
- fn output_partitioning(&self) -> Partitioning {
- self.right.output_partitioning()
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn execute(
&self,
partition: usize,
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 92392455e..44771ba4c 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -43,14 +43,17 @@ use crate::physical_plan::common::combine_batches;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::joins::utils::{
- build_join_schema, check_join_is_valid, JoinOn,
+ build_join_schema, check_join_is_valid, combine_join_equivalence_properties,
+ partitioned_join_output_partitioning, JoinOn,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::physical_plan::{
- metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
+ Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
+
/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
#[derive(Debug)]
@@ -67,6 +70,12 @@ pub struct SortMergeJoinExec {
schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ /// The left SortExpr
+ left_sort_exprs: Vec<PhysicalSortExpr>,
+ /// The right SortExpr
+ right_sort_exprs: Vec<PhysicalSortExpr>,
+ /// The output ordering
+ output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Sort options of join columns used in sorting left and right execution plans
sort_options: Vec<SortOptions>,
/// If null_equals_null is true, null == null else null != null
@@ -104,6 +113,75 @@ impl SortMergeJoinExec {
)));
}
+ let (left_expr, right_expr): (Vec<_>, Vec<_>) = on
+ .iter()
+ .map(|(l, r)| {
+ (
+ Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+ Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+ )
+ })
+ .unzip();
+
+ let left_sort_exprs = left_expr
+ .into_iter()
+ .zip(sort_options.iter())
+ .map(|(k, sort_op)| PhysicalSortExpr {
+ expr: k,
+ options: *sort_op,
+ })
+ .collect::<Vec<_>>();
+
+ let right_sort_exprs = right_expr
+ .into_iter()
+ .zip(sort_options.iter())
+ .map(|(k, sort_op)| PhysicalSortExpr {
+ expr: k,
+ options: *sort_op,
+ })
+ .collect::<Vec<_>>();
+
+ let output_ordering = match join_type {
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => {
+ left.output_ordering().map(|sort_exprs| sort_exprs.to_vec())
+ }
+ JoinType::RightSemi | JoinType::RightAnti => right
+ .output_ordering()
+ .map(|sort_exprs| sort_exprs.to_vec()),
+ JoinType::Right => {
+ let left_columns_len = left.schema().fields.len();
+ right.output_ordering().map(|sort_exprs| {
+ sort_exprs
+ .iter()
+ .map(|e| {
+ let new_expr = e
+ .expr
+ .clone()
+ .transform_down(&|e| match e
+ .as_any()
+ .downcast_ref::<Column>()
+ {
+ Some(col) => Some(Arc::new(Column::new(
+ col.name(),
+ left_columns_len + col.index(),
+ ))),
+ None => None,
+ })
+ .unwrap();
+ PhysicalSortExpr {
+ expr: new_expr,
+ options: e.options,
+ }
+ })
+ .collect::<Vec<_>>()
+ })
+ }
+ JoinType::Full => None,
+ };
+
let schema =
Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0);
@@ -114,10 +192,18 @@ impl SortMergeJoinExec {
join_type,
schema,
metrics: ExecutionPlanMetricsSet::new(),
+ left_sort_exprs,
+ right_sort_exprs,
+ output_ordering,
sort_options,
null_equals_null,
})
}
+
+ /// Set of common columns used to join on
+ pub fn on(&self) -> &[(Column, Column)] {
+ &self.on
+ }
}
impl ExecutionPlan for SortMergeJoinExec {
@@ -129,25 +215,50 @@ impl ExecutionPlan for SortMergeJoinExec {
self.schema.clone()
}
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ let (left_expr, right_expr) = self
+ .on
+ .iter()
+ .map(|(l, r)| {
+ (
+ Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+ Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+ )
+ })
+ .unzip();
+ vec![
+ Distribution::HashPartitioned(left_expr),
+ Distribution::HashPartitioned(right_expr),
+ ]
+ }
+
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
+ }
+
fn output_partitioning(&self) -> Partitioning {
- self.right.output_partitioning()
+ let left_columns_len = self.left.schema().fields.len();
+ partitioned_join_output_partitioning(
+ self.join_type,
+ self.left.output_partitioning(),
+ self.right.output_partitioning(),
+ left_columns_len,
+ )
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- match self.join_type {
- JoinType::Inner
- | JoinType::Left
- | JoinType::LeftSemi
- | JoinType::LeftAnti => self.left.output_ordering(),
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
- self.right.output_ordering()
- }
- JoinType::Full => None,
- }
+ self.output_ordering.as_deref()
}
- fn relies_on_input_order(&self) -> bool {
- true
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ let left_columns_len = self.left.schema().fields.len();
+ combine_join_equivalence_properties(
+ self.join_type,
+ self.left.equivalence_properties(),
+ self.right.equivalence_properties(),
+ left_columns_len,
+ self.on(),
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -228,8 +339,8 @@ impl ExecutionPlan for SortMergeJoinExec {
DisplayFormatType::Default => {
write!(
f,
- "SortMergeJoin: join_type={:?}, on={:?}, schema={:?}",
- self.join_type, self.on, &self.schema
+ "SortMergeJoin: join_type={:?}, on={:?}",
+ self.join_type, self.on
)
}
}
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs
index d041e7dfb..905e59de9 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -23,7 +23,7 @@ use crate::physical_plan::expressions::Column;
use arrow::datatypes::{Field, Schema};
use arrow::error::ArrowError;
use datafusion_common::ScalarValue;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use parking_lot::Mutex;
@@ -33,7 +33,10 @@ use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{
+ ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics,
+};
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
@@ -83,6 +86,128 @@ fn check_join_set_is_valid(
Ok(())
}
+/// Calculate the OutputPartitioning for Partitioned Join
+pub fn partitioned_join_output_partitioning(
+ join_type: JoinType,
+ left_partitioning: Partitioning,
+ right_partitioning: Partitioning,
+ left_columns_len: usize,
+) -> Partitioning {
+ match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
+ left_partitioning
+ }
+ JoinType::RightSemi | JoinType::RightAnti => right_partitioning,
+ JoinType::Right => {
+ adjust_right_output_partitioning(right_partitioning, left_columns_len)
+ }
+ JoinType::Full => {
+ Partitioning::UnknownPartitioning(right_partitioning.partition_count())
+ }
+ }
+}
+
+/// Adjust the right out partitioning to new Column Index
+pub fn adjust_right_output_partitioning(
+ right_partitioning: Partitioning,
+ left_columns_len: usize,
+) -> Partitioning {
+ match right_partitioning {
+ Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
+ Partitioning::UnknownPartitioning(size) => {
+ Partitioning::UnknownPartitioning(size)
+ }
+ Partitioning::Hash(exprs, size) => {
+ let new_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+ Some(col) => Some(Arc::new(Column::new(
+ col.name(),
+ left_columns_len + col.index(),
+ ))),
+ None => None,
+ })
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+ Partitioning::Hash(new_exprs, size)
+ }
+ }
+}
+
+/// Combine the Equivalence Properties for Join Node
+pub fn combine_join_equivalence_properties(
+ join_type: JoinType,
+ left_properties: EquivalenceProperties,
+ right_properties: EquivalenceProperties,
+ left_columns_len: usize,
+ on: &[(Column, Column)],
+) -> EquivalenceProperties {
+ let mut new_properties = match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
+ let mut left_properties = left_properties;
+ let new_right_properties = right_properties
+ .classes()
+ .iter()
+ .map(|prop| {
+ let new_head = Column::new(
+ prop.head().name(),
+ left_columns_len + prop.head().index(),
+ );
+ let new_others = prop
+ .others()
+ .iter()
+ .map(|col| {
+ Column::new(col.name(), left_columns_len + col.index())
+ })
+ .collect::<Vec<_>>();
+ EquivalentClass::new(new_head, new_others)
+ })
+ .collect::<Vec<_>>();
+
+ left_properties.extend(new_right_properties);
+ left_properties
+ }
+ JoinType::LeftSemi | JoinType::LeftAnti => left_properties,
+ JoinType::RightSemi | JoinType::RightAnti => right_properties,
+ };
+
+ if join_type == JoinType::Inner {
+ on.iter().for_each(|(column1, column2)| {
+ let new_column2 =
+ Column::new(column2.name(), left_columns_len + column2.index());
+ new_properties.add_equal_conditions((column1, &new_column2))
+ })
+ }
+ new_properties
+}
+
+/// Calculate the Equivalence Properties for CrossJoin Node
+pub fn cross_join_equivalence_properties(
+ left_properties: EquivalenceProperties,
+ right_properties: EquivalenceProperties,
+ left_columns_len: usize,
+) -> EquivalenceProperties {
+ let mut left_properties = left_properties;
+ let new_right_properties = right_properties
+ .classes()
+ .iter()
+ .map(|prop| {
+ let new_head =
+ Column::new(prop.head().name(), left_columns_len + prop.head().index());
+ let new_others = prop
+ .others()
+ .iter()
+ .map(|col| Column::new(col.name(), left_columns_len + col.index()))
+ .collect::<Vec<_>>();
+ EquivalentClass::new(new_head, new_others)
+ })
+ .collect::<Vec<_>>();
+ left_properties.extend(new_right_properties);
+ left_properties
+}
+
/// Used in ColumnIndex to distinguish which side the index is for
#[derive(Debug, Clone)]
pub enum JoinSide {
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 322c21ff4..171e2b2f0 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -27,7 +27,7 @@ use std::task::{Context, Poll};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
- DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
};
use arrow::array::ArrayRef;
use arrow::compute::limit;
@@ -98,10 +98,9 @@ impl ExecutionPlan for GlobalLimitExec {
vec![self.input.clone()]
}
- fn required_child_distribution(&self) -> Distribution {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition]
}
-
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
@@ -123,6 +122,10 @@ impl ExecutionPlan for GlobalLimitExec {
self.input.output_ordering()
}
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -281,14 +284,13 @@ impl ExecutionPlan for LocalLimitExec {
false
}
- // Local limit does not make any attempt to maintain the input
- // sortedness (if there is more than one partition)
+ // Local limit will not change the input plan's ordering
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- if self.output_partitioning().partition_count() == 1 {
- self.input.output_ordering()
- } else {
- None
- }
+ self.input.output_ordering()
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
}
fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 698eaf12a..7753a5ba7 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -81,10 +81,6 @@ impl ExecutionPlan for MemoryExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 55b46c991..87df26781 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// have any particular output order here
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
- /// Specifies the data distribution requirements of all the
- /// children for this operator
- fn required_child_distribution(&self) -> Distribution {
- Distribution::UnspecifiedDistribution
+ /// Specifies the data distribution requirements for all the
+ /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if !self.children().is_empty() {
+ vec![Distribution::UnspecifiedDistribution; self.children().len()]
+ } else {
+ vec![Distribution::UnspecifiedDistribution]
+ }
+ }
+
+ /// Specifies the ordering requirements for all the
+ /// children for this operator.
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ vec![None; self.children().len()]
}
/// Returns `true` if this operator relies on its inputs being
@@ -136,13 +146,17 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// optimizations which might reorder the inputs (such as
/// repartitioning to increase concurrency).
///
- /// The default implementation returns `true`
+ /// The default implementation checks the input ordering requirements
+ /// and if there is non empty ordering requirements to the input, the method will
+ /// return `true`.
///
/// WARNING: if you override this default and return `false`, your
/// operator can not rely on DataFusion preserving the input order
/// as it will likely not.
fn relies_on_input_order(&self) -> bool {
- true
+ self.required_input_ordering()
+ .iter()
+ .any(|ordering| matches!(ordering, Some(_)))
}
/// Returns `false` if this operator's implementation may reorder
@@ -175,10 +189,15 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn benefits_from_input_partitioning(&self) -> bool {
// By default try to maximize parallelism with more CPUs if
// possible
- !matches!(
- self.required_child_distribution(),
- Distribution::SinglePartition
- )
+ !self
+ .required_input_distribution()
+ .into_iter()
+ .any(|dist| matches!(dist, Distribution::SinglePartition))
+ }
+
+ /// Get the EquivalenceProperties within the plan
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ EquivalenceProperties::new()
}
/// Get a list of child execution plans that provide the input for this plan. The returned list
@@ -460,6 +479,23 @@ impl Partitioning {
}
}
+impl PartialEq for Partitioning {
+ fn eq(&self, other: &Partitioning) -> bool {
+ match (self, other) {
+ (
+ Partitioning::RoundRobinBatch(count1),
+ Partitioning::RoundRobinBatch(count2),
+ ) if count1 == count2 => true,
+ (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
+ if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) =>
+ {
+ true
+ }
+ _ => false,
+ }
+ }
+}
+
/// Distribution schemes
#[derive(Debug, Clone)]
pub enum Distribution {
@@ -472,7 +508,10 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
+use datafusion_physical_expr::expr_list_eq_strict_order;
+use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::EquivalenceProperties;
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
/// Applies an optional projection to a [`SchemaRef`], returning the
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index a0fbba2f8..88a43e111 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -522,8 +522,9 @@ impl DefaultPhysicalPlanner {
&& session_state.config.target_partitions > 1
&& session_state.config.repartition_windows;
- let input_exec = if can_repartition {
- let partition_keys = partition_keys
+ let physical_partition_keys = if can_repartition
+ {
+ partition_keys
.iter()
.map(|e| {
self.create_physical_expr(
@@ -533,11 +534,16 @@ impl DefaultPhysicalPlanner {
session_state,
)
})
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+ } else {
+ vec![]
+ };
+
+ let input_exec = if can_repartition {
Arc::new(RepartitionExec::try_new(
input_exec,
Partitioning::Hash(
- partition_keys,
+ physical_partition_keys.clone(),
session_state.config.target_partitions,
),
)?)
@@ -576,8 +582,8 @@ impl DefaultPhysicalPlanner {
let logical_input_schema = input.schema();
- let input_exec = if sort_keys.is_empty() {
- input_exec
+ let physical_sort_keys = if sort_keys.is_empty() {
+ None
} else {
let physical_input_schema = input_exec.schema();
let sort_keys = sort_keys
@@ -600,13 +606,19 @@ impl DefaultPhysicalPlanner {
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
- Arc::new(if can_repartition {
- SortExec::new_with_partitioning(sort_keys, input_exec, true, None)
- } else {
- SortExec::try_new(sort_keys, input_exec, None)?
- })
+ Some(sort_keys)
};
+ let input_exec = match physical_sort_keys.clone() {
+ None => input_exec,
+ Some(sort_exprs) => {
+ if can_repartition {
+ Arc::new(SortExec::new_with_partitioning(sort_exprs, input_exec, true, None))
+ } else {
+ Arc::new(SortExec::try_new(sort_exprs, input_exec, None)?)
+ }
+ },
+ };
let physical_input_schema = input_exec.schema();
let window_expr = window_expr
.iter()
@@ -624,6 +636,8 @@ impl DefaultPhysicalPlanner {
window_expr,
input_exec,
physical_input_schema,
+ physical_partition_keys,
+ physical_sort_keys,
)?))
}
LogicalPlan::Aggregate(Aggregate {
@@ -2308,10 +2322,6 @@ mod tests {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 5fa3c93cd..2b6297f8c 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -21,14 +21,15 @@
//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
use std::any::Any;
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashMap};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::Result;
use crate::physical_plan::{
- ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+ ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
+ Partitioning, PhysicalExpr,
};
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -39,6 +40,7 @@ use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::execution::context::TaskContext;
+use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
use futures::stream::Stream;
use futures::stream::StreamExt;
@@ -51,6 +53,11 @@ pub struct ProjectionExec {
schema: SchemaRef,
/// The input plan
input: Arc<dyn ExecutionPlan>,
+ /// The output ordering
+ output_ordering: Option<Vec<PhysicalSortExpr>>,
+ /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
+ /// The key is the column from the input schema and the values are the columns from the output schema
+ alias_map: HashMap<Column, Vec<Column>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -82,10 +89,47 @@ impl ProjectionExec {
input_schema.metadata().clone(),
));
+ let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+ for (expression, name) in expr.iter() {
+ if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+ let new_col_idx = schema.index_of(name)?;
+ // When the column name is the same, but index does not equal, treat it as Alias
+ if (column.name() != name) || (column.index() != new_col_idx) {
+ let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
+ entry.push(Column::new(name, new_col_idx));
+ }
+ };
+ }
+
+ // Output Ordering need to respect the alias
+ let child_output_ordering = input.output_ordering();
+ let output_ordering = match child_output_ordering {
+ Some(sort_exprs) => {
+ let normalized_exprs = sort_exprs
+ .iter()
+ .map(|sort_expr| {
+ let expr = normalize_out_expr_with_alias_schema(
+ sort_expr.expr.clone(),
+ &alias_map,
+ &schema,
+ );
+ PhysicalSortExpr {
+ expr,
+ options: sort_expr.options,
+ }
+ })
+ .collect::<Vec<_>>();
+ Some(normalized_exprs)
+ }
+ None => None,
+ };
+
Ok(Self {
expr,
schema,
input: input.clone(),
+ output_ordering,
+ alias_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -118,11 +162,28 @@ impl ExecutionPlan for ProjectionExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ // Output partition need to respect the alias
+ let input_partition = self.input.output_partitioning();
+ match input_partition {
+ Partitioning::Hash(exprs, part) => {
+ let normalized_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ normalize_out_expr_with_alias_schema(
+ expr,
+ &self.alias_map,
+ &self.schema,
+ )
+ })
+ .collect::<Vec<_>>();
+ Partitioning::Hash(normalized_exprs, part)
+ }
+ _ => input_partition,
+ }
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
+ self.output_ordering.as_deref()
}
fn maintains_input_order(&self) -> bool {
@@ -130,8 +191,15 @@ impl ExecutionPlan for ProjectionExec {
true
}
- fn relies_on_input_order(&self) -> bool {
- false
+ // Equivalence properties need to be adjusted after the Projection.
+ // 1) Add Alias, Alias can introduce additional equivalence properties,
+ // For example: Projection(a, a as a1, a as a2)
+ // 2) Truncate the properties that are not in the schema of the Projection
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ let mut input_equivalence_properties = self.input.equivalence_properties();
+ input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
+ input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
+ input_equivalence_properties
}
fn with_new_children(
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 4c057b1d6..3e50ec645 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -25,7 +25,9 @@ use std::{any::Any, vec};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
+use crate::physical_plan::{
+ DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics,
+};
use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
@@ -272,10 +274,6 @@ impl ExecutionPlan for RepartitionExec {
vec![self.input.clone()]
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -294,6 +292,10 @@ impl ExecutionPlan for RepartitionExec {
None
}
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
+ }
+
fn execute(
&self,
partition: usize,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index cd14f27b6..fb289b68b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -46,6 +46,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
+use datafusion_physical_expr::EquivalenceProperties;
use futures::lock::Mutex;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use log::{debug, error};
@@ -743,11 +744,13 @@ impl ExecutionPlan for SortExec {
}
}
- fn required_child_distribution(&self) -> Distribution {
+ fn required_input_distribution(&self) -> Vec<Distribution> {
if self.preserve_partitioning {
- Distribution::UnspecifiedDistribution
+ vec![Distribution::UnspecifiedDistribution]
} else {
- Distribution::SinglePartition
+ // global sort
+ // TODO support RangePartition and OrderedDistribution
+ vec![Distribution::SinglePartition]
}
}
@@ -755,11 +758,6 @@ impl ExecutionPlan for SortExec {
vec![self.input.clone()]
}
- fn relies_on_input_order(&self) -> bool {
- // this operator resorts everything
- false
- }
-
fn benefits_from_input_partitioning(&self) -> bool {
false
}
@@ -768,6 +766,10 @@ impl ExecutionPlan for SortExec {
Some(&self.expr)
}
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 17dea38c0..2fe5bc313 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -49,6 +49,7 @@ use crate::physical_plan::{
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
+use datafusion_physical_expr::EquivalenceProperties;
/// Sort preserving merge execution plan
///
@@ -123,18 +124,22 @@ impl ExecutionPlan for SortPreservingMergeExec {
Partitioning::UnknownPartitioning(1)
}
- fn required_child_distribution(&self) -> Distribution {
- Distribution::UnspecifiedDistribution
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::UnspecifiedDistribution]
}
- fn relies_on_input_order(&self) -> bool {
- true
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ vec![Some(&self.expr)]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Some(&self.expr)
}
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index bf9dfbd1b..af57c9ef9 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -21,15 +21,19 @@
//! The Union operator combines multiple inputs with the same schema
+use std::pin::Pin;
+use std::task::{Context, Poll};
use std::{any::Any, sync::Arc};
+use arrow::error::Result as ArrowResult;
use arrow::{
datatypes::{Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use futures::StreamExt;
+use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::debug;
+use log::warn;
use super::{
expressions::PhysicalSortExpr,
@@ -42,6 +46,8 @@ use crate::{
error::Result,
physical_plan::{expressions, metrics::BaselineMetrics},
};
+use datafusion_physical_expr::sort_expr_list_eq_strict_order;
+use tokio::macros::support::thread_rng_n;
/// UNION ALL execution plan
#[derive(Debug)]
@@ -52,6 +58,8 @@ pub struct UnionExec {
metrics: ExecutionPlanMetricsSet,
/// Schema of Union
schema: SchemaRef,
+ /// Partition aware Union
+ partition_aware: bool,
}
impl UnionExec {
@@ -78,10 +86,24 @@ impl UnionExec {
inputs[0].schema().metadata().clone(),
));
+ // If all the input partitions have the same Hash partition spec with the first_input_partition
+ // The UnionExec is partition aware.
+ //
+ // It might be too strict here in the case that the input partition specs are compatible but not exactly the same.
+ // For example one input partition has the partition spec Hash('a','b','c') and
+ // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c').
+ let first_input_partition = inputs[0].output_partitioning();
+ let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _))
+ && inputs
+ .iter()
+ .map(|plan| plan.output_partitioning())
+ .all(|partition| partition == first_input_partition);
+
UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
schema,
+ partition_aware,
}
}
@@ -107,23 +129,46 @@ impl ExecutionPlan for UnionExec {
/// Output of the union is the combination of all output partitions of the inputs
fn output_partitioning(&self) -> Partitioning {
- // Sums all the output partitions
- let num_partitions = self
- .inputs
- .iter()
- .map(|plan| plan.output_partitioning().partition_count())
- .sum();
- // TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`)
- // https://issues.apache.org/jira/browse/ARROW-11991
- Partitioning::UnknownPartitioning(num_partitions)
+ if self.partition_aware {
+ self.inputs[0].output_partitioning()
+ } else {
+ // Output the combination of all output partitions of the inputs if the Union is not partition aware
+ let num_partitions = self
+ .inputs
+ .iter()
+ .map(|plan| plan.output_partitioning().partition_count())
+ .sum();
+
+ Partitioning::UnknownPartitioning(num_partitions)
+ }
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn relies_on_input_order(&self) -> bool {
- false
+ let first_input_ordering = self.inputs[0].output_ordering();
+ // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering
+ // Return the first_input_ordering as the output_ordering
+ //
+ // It might be too strict here in the case that the input ordering are compatible but not exactly the same.
+ // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering
+ // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a').
+ if !self.partition_aware
+ && first_input_ordering.is_some()
+ && self
+ .inputs
+ .iter()
+ .map(|plan| plan.output_ordering())
+ .all(|ordering| {
+ ordering.is_some()
+ && sort_expr_list_eq_strict_order(
+ ordering.unwrap(),
+ first_input_ordering.unwrap(),
+ )
+ })
+ {
+ first_input_ordering
+ } else {
+ None
+ }
}
fn with_new_children(
@@ -145,19 +190,38 @@ impl ExecutionPlan for UnionExec {
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer(); // record on drop
- // find partition to execute
- for input in self.inputs.iter() {
- // Calculate whether partition belongs to the current partition
- if partition < input.output_partitioning().partition_count() {
- let stream = input.execute(partition, context)?;
- debug!("Found a Union partition to execute");
+ if self.partition_aware {
+ let mut input_stream_vec = vec![];
+ for input in self.inputs.iter() {
+ if partition < input.output_partitioning().partition_count() {
+ input_stream_vec.push(input.execute(partition, context.clone())?);
+ } else {
+ // Do not find a partition to execute
+ break;
+ }
+ }
+ if input_stream_vec.len() == self.inputs.len() {
+ let stream = Box::pin(CombinedRecordBatchStream::new(
+ self.schema(),
+ input_stream_vec,
+ ));
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
- } else {
- partition -= input.output_partitioning().partition_count();
+ }
+ } else {
+ // find partition to execute
+ for input in self.inputs.iter() {
+ // Calculate whether partition belongs to the current partition
+ if partition < input.output_partitioning().partition_count() {
+ let stream = input.execute(partition, context)?;
+ debug!("Found a Union partition to execute");
+ return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+ } else {
+ partition -= input.output_partitioning().partition_count();
+ }
}
}
- debug!("Error in Union: Partition {} not found", partition);
+ warn!("Error in Union: Partition {} not found", partition);
Err(crate::error::DataFusionError::Execution(format!(
"Partition {} not found in Union",
@@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec {
}
}
+/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
+pub struct CombinedRecordBatchStream {
+ /// Schema wrapped by Arc
+ schema: SchemaRef,
+ /// Stream entries
+ entries: Vec<SendableRecordBatchStream>,
+}
+
+impl CombinedRecordBatchStream {
+ /// Create an CombinedRecordBatchStream
+ pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self {
+ Self { schema, entries }
+ }
+}
+
+impl RecordBatchStream for CombinedRecordBatchStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Stream for CombinedRecordBatchStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ use Poll::*;
+
+ let start = thread_rng_n(self.entries.len() as u32) as usize;
+ let mut idx = start;
+
+ for _ in 0..self.entries.len() {
+ let stream = self.entries.get_mut(idx).unwrap();
+
+ match Pin::new(stream).poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {
+ // Remove the entry
+ self.entries.swap_remove(idx);
+
+ // Check if this was the last entry, if so the cursor needs
+ // to wrap
+ if idx == self.entries.len() {
+ idx = 0;
+ } else if idx < start && start <= self.entries.len() {
+ // The stream being swapped into the current index has
+ // already been polled, so skip it.
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ Pending => {
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ }
+
+ // If the map is empty, then the stream is complete.
+ if self.entries.is_empty() {
+ Ready(None)
+ } else {
+ Pending
+ }
+ }
+}
+
/// Stream wrapper that records `BaselineMetrics` for a particular
/// partition
struct ObservedStream {
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index 897936814..6ab4f7b82 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -22,8 +22,8 @@ use super::{common, SendableRecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::{
- memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan,
- Partitioning, PhysicalExpr,
+ memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning,
+ PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::array::new_null_array;
@@ -108,11 +108,6 @@ impl ExecutionPlan for ValuesExec {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
-
- fn required_child_distribution(&self) -> Distribution {
- Distribution::UnspecifiedDistribution
- }
-
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
@@ -122,10 +117,6 @@ impl ExecutionPlan for ValuesExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index bece2a50c..a488c6ffa 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -217,6 +217,8 @@ mod tests {
],
input,
schema.clone(),
+ vec![],
+ None,
)?);
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
@@ -262,6 +264,8 @@ mod tests {
)?],
blocking_exec,
schema,
+ vec![],
+ None,
)?);
let fut = collect(window_agg_exec, task_ctx);
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index e9eac35a3..76ad0afb1 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,8 +24,9 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
- common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
- Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+ common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+ ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
+ SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::{
array::ArrayRef,
@@ -35,6 +36,7 @@ use arrow::{
};
use futures::stream::Stream;
use futures::{ready, StreamExt};
+use log::warn;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
@@ -51,6 +53,10 @@ pub struct WindowAggExec {
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
+ /// Partition Keys
+ pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+ /// Sort Keys
+ pub sort_keys: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -61,6 +67,8 @@ impl WindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
+ partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+ sort_keys: Option<Vec<PhysicalSortExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
@@ -69,6 +77,8 @@ impl WindowAggExec {
window_expr,
schema,
input_schema,
+ partition_keys,
+ sort_keys,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
true
}
- fn relies_on_input_order(&self) -> bool {
- true
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ let sort_keys = self.sort_keys.as_deref();
+ vec![sort_keys]
}
- fn required_child_distribution(&self) -> Distribution {
- if self
- .window_expr()
- .iter()
- .all(|expr| expr.partition_by().is_empty())
- {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.partition_keys.is_empty() {
+ warn!("No partition defined for WindowAggExec!!!");
+ vec![Distribution::SinglePartition]
} else {
- Distribution::UnspecifiedDistribution
+ //TODO support PartitionCollections if there is no common partition columns in the window_expr
+ vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ self.input.equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -143,6 +156,8 @@ impl ExecutionPlan for WindowAggExec {
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
+ self.partition_keys.clone(),
+ self.sort_keys.clone(),
)?))
}
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
index 20e7c6e79..8ecece85e 100644
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -235,8 +235,8 @@ impl ExecutionPlan for ProxyExecutionPlan {
self.inner.output_ordering()
}
- fn required_child_distribution(&self) -> Distribution {
- self.inner.required_child_distribution()
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ self.inner.required_input_distribution()
}
fn relies_on_input_order(&self) -> bool {
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 7e9ece36b..3fb810e3a 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -441,12 +441,8 @@ impl ExecutionPlan for TopKExec {
None
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
- fn required_child_distribution(&self) -> Distribution {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
new file mode 100644
index 000000000..411a492a5
--- /dev/null
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expressions::Column;
+
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+/// Equivalence Properties is a vec of EquivalentClass.
+#[derive(Debug, Default, Clone)]
+pub struct EquivalenceProperties {
+ classes: Vec<EquivalentClass>,
+}
+
+impl EquivalenceProperties {
+ pub fn new() -> Self {
+ EquivalenceProperties { classes: vec![] }
+ }
+
+ pub fn classes(&self) -> &[EquivalentClass] {
+ &self.classes
+ }
+
+ pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I) {
+ self.classes.extend(iter)
+ }
+
+ /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
+ /// equality predicates in Join or Filter
+ pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) {
+ let mut idx1: Option<usize> = None;
+ let mut idx2: Option<usize> = None;
+ for (idx, class) in self.classes.iter_mut().enumerate() {
+ let contains_first = class.contains(new_conditions.0);
+ let contains_second = class.contains(new_conditions.1);
+ match (contains_first, contains_second) {
+ (true, false) => {
+ class.insert(new_conditions.1.clone());
+ idx1 = Some(idx);
+ }
+ (false, true) => {
+ class.insert(new_conditions.0.clone());
+ idx2 = Some(idx);
+ }
+ (true, true) => {
+ idx1 = Some(idx);
+ idx2 = Some(idx);
+ break;
+ }
+ (false, false) => {}
+ }
+ }
+
+ match (idx1, idx2) {
+ (Some(idx_1), Some(idx_2)) if idx_1 != idx_2 => {
+ // need to merge the two existing EquivalentClasses
+ let second_eq_class = self.classes.get(idx_2).unwrap().clone();
+ let first_eq_class = self.classes.get_mut(idx_1).unwrap();
+ for prop in second_eq_class.iter() {
+ if !first_eq_class.contains(prop) {
+ first_eq_class.insert(prop.clone());
+ }
+ }
+ self.classes.remove(idx_2);
+ }
+ (None, None) => {
+ // adding new pairs
+ self.classes.push(EquivalentClass::new(
+ new_conditions.0.clone(),
+ vec![new_conditions.1.clone()],
+ ));
+ }
+ _ => {}
+ }
+ }
+
+ pub fn merge_properties_with_alias(
+ &mut self,
+ alias_map: &HashMap<Column, Vec<Column>>,
+ ) {
+ for (column, columns) in alias_map {
+ let mut find_match = false;
+ for class in self.classes.iter_mut() {
+ if class.contains(column) {
+ for col in columns {
+ class.insert(col.clone());
+ }
+ find_match = true;
+ break;
+ }
+ }
+ if !find_match {
+ self.classes
+ .push(EquivalentClass::new(column.clone(), columns.clone()));
+ }
+ }
+ }
+
+ pub fn truncate_properties_not_in_schema(&mut self, schema: &SchemaRef) {
+ for class in self.classes.iter_mut() {
+ let mut columns_to_remove = vec![];
+ for column in class.iter() {
+ if let Ok(idx) = schema.index_of(column.name()) {
+ if idx != column.index() {
+ columns_to_remove.push(column.clone());
+ }
+ } else {
+ columns_to_remove.push(column.clone());
+ }
+ }
+ for column in columns_to_remove {
+ class.remove(&column);
+ }
+ }
+ self.classes.retain(|props| props.len() > 1);
+ }
+}
+
+/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
+/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.
+#[derive(Debug, Clone)]
+pub struct EquivalentClass {
+ /// First element in the EquivalentClass
+ head: Column,
+ /// Other equal columns
+ others: HashSet<Column>,
+}
+
+impl EquivalentClass {
+ pub fn new(head: Column, others: Vec<Column>) -> Self {
+ EquivalentClass {
+ head,
+ others: HashSet::from_iter(others),
+ }
+ }
+
+ pub fn head(&self) -> &Column {
+ &self.head
+ }
+
+ pub fn others(&self) -> &HashSet<Column> {
+ &self.others
+ }
+
+ pub fn contains(&self, col: &Column) -> bool {
+ self.head == *col || self.others.contains(col)
+ }
+
+ pub fn insert(&mut self, col: Column) -> bool {
+ self.others.insert(col)
+ }
+
+ pub fn remove(&mut self, col: &Column) -> bool {
+ let removed = self.others.remove(col);
+ if !removed && *col == self.head {
+ let one_col = self.others.iter().next().cloned();
+ if let Some(col) = one_col {
+ let removed = self.others.remove(&col);
+ self.head = col;
+ removed
+ } else {
+ false
+ }
+ } else {
+ true
+ }
+ }
+
+ pub fn iter(&self) -> impl Iterator<Item = &'_ Column> {
+ std::iter::once(&self.head).chain(self.others.iter())
+ }
+
+ pub fn len(&self) -> usize {
+ self.others.len() + 1
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::expressions::Column;
+ use datafusion_common::Result;
+
+ #[test]
+ fn add_equal_conditions_test() -> Result<()> {
+ let mut eq_properties = EquivalenceProperties::new();
+ let new_condition = (&Column::new("a", 0), &Column::new("b", 1));
+ eq_properties.add_equal_conditions(new_condition);
+ assert_eq!(eq_properties.classes().len(), 1);
+
+ let new_condition = (&Column::new("b", 1), &Column::new("a", 0));
+ eq_properties.add_equal_conditions(new_condition);
+ assert_eq!(eq_properties.classes().len(), 1);
+ assert_eq!(eq_properties.classes()[0].len(), 2);
+
+ let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
+ eq_properties.add_equal_conditions(new_condition);
+ assert_eq!(eq_properties.classes().len(), 1);
+ assert_eq!(eq_properties.classes()[0].len(), 3);
+
+ let new_condition = (&Column::new("x", 99), &Column::new("y", 100));
+ eq_properties.add_equal_conditions(new_condition);
+ assert_eq!(eq_properties.classes().len(), 2);
+
+ let new_condition = (&Column::new("x", 99), &Column::new("a", 0));
+ eq_properties.add_equal_conditions(new_condition);
+ assert_eq!(eq_properties.classes().len(), 1);
+ assert_eq!(eq_properties.classes()[0].len(), 5);
+
+ Ok(())
+ }
+
+ #[test]
+ fn merge_equivalence_properties_with_alias_test() -> Result<()> {
+ let mut eq_properties = EquivalenceProperties::new();
+ let mut alias_map = HashMap::new();
+ alias_map.insert(
+ Column::new("a", 0),
+ vec![Column::new("a1", 1), Column::new("a2", 2)],
+ );
+
+ eq_properties.merge_properties_with_alias(&alias_map);
+ assert_eq!(eq_properties.classes().len(), 1);
+ assert_eq!(eq_properties.classes()[0].len(), 3);
+
+ let mut alias_map = HashMap::new();
+ alias_map.insert(
+ Column::new("a", 0),
+ vec![Column::new("a3", 1), Column::new("a4", 2)],
+ );
+ eq_properties.merge_properties_with_alias(&alias_map);
+ assert_eq!(eq_properties.classes().len(), 1);
+ assert_eq!(eq_properties.classes()[0].len(), 5);
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs
index 450919aa3..776f4dc0a 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -118,6 +118,76 @@ impl PartialEq<dyn Any> for Column {
}
}
+/// Represents the unknown column without index
+#[derive(Debug, Hash, PartialEq, Eq, Clone)]
+pub struct UnKnownColumn {
+ name: String,
+}
+
+impl UnKnownColumn {
+ /// Create a new unknown column expression
+ pub fn new(name: &str) -> Self {
+ Self {
+ name: name.to_owned(),
+ }
+ }
+
+ /// Get the column name
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl std::fmt::Display for UnKnownColumn {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.name)
+ }
+}
+
+impl PhysicalExpr for UnKnownColumn {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ /// Get the data type of this expression, given the schema of the input
+ fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+ Ok(DataType::Null)
+ }
+
+ /// Decide whehter this expression is nullable, given the schema of the input
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ Ok(true)
+ }
+
+ /// Evaluate the expression
+ fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
+ Err(DataFusionError::Plan(
+ "UnKnownColumn::evaluate() should not be called".to_owned(),
+ ))
+ }
+
+ fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ Ok(self)
+ }
+}
+
+impl PartialEq<dyn Any> for UnKnownColumn {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| self == x)
+ .unwrap_or(false)
+ }
+}
+
#[derive(Debug, Clone)]
struct ColumnExprStats {
index: usize,
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index d27737dd9..26bb3ca1e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -72,7 +72,7 @@ pub use case::{case, CaseExpr};
pub use cast::{
cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
};
-pub use column::{col, Column};
+pub use column::{col, Column, UnKnownColumn};
pub use datetime::DateTimeIntervalExpr;
pub use get_indexed_field::GetIndexedFieldExpr;
pub use in_list::{in_list, InListExpr};
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index d2b899dca..026329e84 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -21,6 +21,7 @@ pub mod conditional_expressions;
#[cfg(feature = "crypto_expressions")]
pub mod crypto_expressions;
pub mod datetime_expressions;
+pub mod equivalence;
pub mod execution_props;
pub mod expressions;
pub mod functions;
@@ -46,7 +47,15 @@ pub mod window;
// reexport this to maintain compatibility with anything that used from_slice previously
pub use aggregate::AggregateExpr;
pub use datafusion_common::from_slice;
+pub use equivalence::EquivalenceProperties;
+pub use equivalence::EquivalentClass;
pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::PhysicalSortExpr;
+pub use utils::{
+ expr_list_eq_any_order, expr_list_eq_strict_order,
+ normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema,
+ normalize_sort_expr_with_equivalence_properties, sort_expr_list_eq_strict_order,
+ split_conjunction,
+};
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 7ee947c0a..78ce9c931 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,9 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use crate::equivalence::EquivalentClass;
+use crate::expressions::BinaryExpr;
+use crate::expressions::Column;
+use crate::expressions::UnKnownColumn;
+use crate::rewrite::TreeNodeRewritable;
use crate::PhysicalExpr;
use crate::PhysicalSortExpr;
+use datafusion_expr::Operator;
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
use std::sync::Arc;
/// Compare the two expr lists are equal no matter the order.
@@ -65,6 +74,117 @@ pub fn sort_expr_list_eq_strict_order(
list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
}
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+pub fn split_conjunction(
+ predicate: &Arc<dyn PhysicalExpr>,
+) -> Vec<&Arc<dyn PhysicalExpr>> {
+ split_conjunction_impl(predicate, vec![])
+}
+
+fn split_conjunction_impl<'a>(
+ predicate: &'a Arc<dyn PhysicalExpr>,
+ mut exprs: Vec<&'a Arc<dyn PhysicalExpr>>,
+) -> Vec<&'a Arc<dyn PhysicalExpr>> {
+ match predicate.as_any().downcast_ref::<BinaryExpr>() {
+ Some(binary) => match binary.op() {
+ Operator::And => {
+ let exprs = split_conjunction_impl(binary.left(), exprs);
+ split_conjunction_impl(binary.right(), exprs)
+ }
+ _ => {
+ exprs.push(predicate);
+ exprs
+ }
+ },
+ None => {
+ exprs.push(predicate);
+ exprs
+ }
+ }
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+ expr: Arc<dyn PhysicalExpr>,
+ alias_map: &HashMap<Column, Vec<Column>>,
+ schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+ let expr_clone = expr.clone();
+ expr_clone
+ .transform(&|expr| {
+ let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+ match expr.as_any().downcast_ref::<Column>() {
+ Some(column) => {
+ let out = alias_map
+ .get(column)
+ .map(|c| {
+ let out_col: Arc<dyn PhysicalExpr> =
+ Arc::new(c[0].clone());
+ out_col
+ })
+ .or_else(|| match schema.index_of(column.name()) {
+ // Exactly matching, return None, no need to do the transform
+ Ok(idx) if column.index() == idx => None,
+ _ => {
+ let out_col: Arc<dyn PhysicalExpr> =
+ Arc::new(UnKnownColumn::new(column.name()));
+ Some(out_col)
+ }
+ });
+ out
+ }
+ None => None,
+ };
+ normalized_form
+ })
+ .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+ expr: Arc<dyn PhysicalExpr>,
+ eq_properties: &[EquivalentClass],
+) -> Arc<dyn PhysicalExpr> {
+ let expr_clone = expr.clone();
+ expr_clone
+ .transform(&|expr| match expr.as_any().downcast_ref::<Column>() {
+ Some(column) => {
+ let mut normalized: Option<Arc<dyn PhysicalExpr>> = None;
+ for class in eq_properties {
+ if class.contains(column) {
+ normalized = Some(Arc::new(class.head().clone()));
+ break;
+ }
+ }
+ normalized
+ }
+ None => None,
+ })
+ .unwrap_or(expr)
+}
+
+pub fn normalize_sort_expr_with_equivalence_properties(
+ sort_expr: PhysicalSortExpr,
+ eq_properties: &[EquivalentClass],
+) -> PhysicalSortExpr {
+ let normalized_expr =
+ normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties);
+
+ if sort_expr.expr.ne(&normalized_expr) {
+ PhysicalSortExpr {
+ expr: normalized_expr,
+ options: sort_expr.options,
+ }
+ } else {
+ sort_expr
+ }
+}
+
#[cfg(test)]
mod tests {
@@ -77,7 +197,7 @@ mod tests {
use std::sync::Arc;
#[test]
- fn expr_list_eq_any_order_test() -> Result<()> {
+ fn expr_list_eq_test() -> Result<()> {
let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("a", 0)),
@@ -91,6 +211,15 @@ mod tests {
assert!(!expr_list_eq_any_order(list1.as_slice(), list2.as_slice()));
assert!(!expr_list_eq_any_order(list2.as_slice(), list1.as_slice()));
+ assert!(!expr_list_eq_strict_order(
+ list1.as_slice(),
+ list2.as_slice()
+ ));
+ assert!(!expr_list_eq_strict_order(
+ list2.as_slice(),
+ list1.as_slice()
+ ));
+
let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
@@ -110,6 +239,17 @@ mod tests {
assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice()));
assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice()));
+ assert!(!expr_list_eq_strict_order(
+ list3.as_slice(),
+ list4.as_slice()
+ ));
+ assert!(!expr_list_eq_strict_order(
+ list4.as_slice(),
+ list3.as_slice()
+ ));
+ assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice()));
+ assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice()));
+
Ok(())
}