You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by oz...@apache.org on 2023/11/27 06:21:32 UTC
(arrow-datafusion) branch main updated: Projection Pushdown over StreamingTableExec (#8299)
This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d81c961ccb Projection Pushdown over StreamingTableExec (#8299)
d81c961ccb is described below
commit d81c961ccb301bf12fba002474c3d2092d66d032
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Mon Nov 27 09:21:26 2023 +0300
Projection Pushdown over StreamingTableExec (#8299)
* Projection above streaming table can be removed
* Review
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../src/physical_optimizer/projection_pushdown.rs | 186 +++++++++++++++++++--
datafusion/physical-plan/src/streaming.rs | 29 +++-
2 files changed, 199 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 74d0de507e..c0e512ffe5 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -20,6 +20,8 @@
//! projections one by one if the operator below is amenable to this. If a
//! projection reaches a source, it can even dissappear from the plan entirely.
+use std::sync::Arc;
+
use super::output_requirements::OutputRequirementExec;
use super::PhysicalOptimizerRule;
use crate::datasource::physical_plan::CsvExec;
@@ -39,7 +41,6 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{Distribution, ExecutionPlan};
use arrow_schema::SchemaRef;
-
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::JoinSide;
@@ -47,10 +48,10 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
+use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;
use itertools::Itertools;
-use std::sync::Arc;
/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to
/// remove or swap with its child.
@@ -135,6 +136,8 @@ pub fn remove_unnecessary_projections(
try_swapping_with_sort_merge_join(projection, sm_join)?
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
try_swapping_with_sym_hash_join(projection, sym_join)?
+ } else if let Some(ste) = input.downcast_ref::<StreamingTableExec>() {
+ try_swapping_with_streaming_table(projection, ste)?
} else {
// If the input plan of the projection is not one of the above, we
// conservatively assume that pushing the projection down may hurt.
@@ -149,8 +152,8 @@ pub fn remove_unnecessary_projections(
Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes))
}
-/// Tries to swap `projection` with its input (`csv`). If possible, performs
-/// the swap and returns [`CsvExec`] as the top plan. Otherwise, returns `None`.
+/// Tries to embed `projection` to its input (`csv`). If possible, returns
+/// [`CsvExec`] as the top plan. Otherwise, returns `None`.
fn try_swapping_with_csv(
projection: &ProjectionExec,
csv: &CsvExec,
@@ -174,8 +177,8 @@ fn try_swapping_with_csv(
})
}
-/// Tries to swap `projection` with its input (`memory`). If possible, performs
-/// the swap and returns [`MemoryExec`] as the top plan. Otherwise, returns `None`.
+/// Tries to embed `projection` to its input (`memory`). If possible, returns
+/// [`MemoryExec`] as the top plan. Otherwise, returns `None`.
fn try_swapping_with_memory(
projection: &ProjectionExec,
memory: &MemoryExec,
@@ -197,10 +200,52 @@ fn try_swapping_with_memory(
.transpose()
}
+/// Tries to embed `projection` to its input (`streaming table`).
+/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
+/// returns `None`.
+fn try_swapping_with_streaming_table(
+ projection: &ProjectionExec,
+ streaming_table: &StreamingTableExec,
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ if !all_alias_free_columns(projection.expr()) {
+ return Ok(None);
+ }
+
+ let streaming_table_projections = streaming_table
+ .projection()
+ .as_ref()
+ .map(|i| i.as_ref().to_vec());
+ let new_projections =
+ new_projections_for_columns(projection, &streaming_table_projections);
+
+ let mut lex_orderings = vec![];
+ for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
+ let mut orderings = vec![];
+ for order in lex_ordering {
+ let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)?
+ else {
+ return Ok(None);
+ };
+ orderings.push(PhysicalSortExpr {
+ expr: new_ordering,
+ options: order.options,
+ });
+ }
+ lex_orderings.push(orderings);
+ }
+
+ StreamingTableExec::try_new(
+ streaming_table.partition_schema().clone(),
+ streaming_table.partitions().clone(),
+ Some(&new_projections),
+ lex_orderings,
+ streaming_table.is_infinite(),
+ )
+ .map(|e| Some(Arc::new(e) as _))
+}
+
/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
-/// Two consecutive projections can always merge into a single projection unless
-/// the [`update_expr`] function does not support one of the expression
-/// types involved in the projection.
+/// Two consecutive projections can always merge into a single projection.
fn try_unifying_projections(
projection: &ProjectionExec,
child: &ProjectionExec,
@@ -779,10 +824,6 @@ fn new_projections_for_columns(
/// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with
/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
-///
-/// If the expression contains a `PhysicalExpr` variant that this function does
-/// not support, it will return `None`. An error can only be introduced if
-/// `CaseExpr::try_new` returns an error.
fn update_expr(
expr: &Arc<dyn PhysicalExpr>,
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
@@ -1102,10 +1143,11 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
- use arrow_schema::{DataType, Field, Schema, SortOptions};
+ use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
+ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
@@ -1115,8 +1157,11 @@ mod tests {
PhysicalSortRequirement, ScalarFunctionExpr,
};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
+ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::union::UnionExec;
+ use itertools::Itertools;
+
#[test]
fn test_update_matching_exprs() -> Result<()> {
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
@@ -1575,6 +1620,119 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_streaming_table_after_projection() -> Result<()> {
+ struct DummyStreamPartition {
+ schema: SchemaRef,
+ }
+ impl PartitionStream for DummyStreamPartition {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+ fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+ unreachable!()
+ }
+ }
+
+ let streaming_table = StreamingTableExec::try_new(
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
+ ])),
+ vec![Arc::new(DummyStreamPartition {
+ schema: Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
+ ])),
+ }) as _],
+ Some(&vec![0_usize, 2, 4, 3]),
+ vec![
+ vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("e", 2)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ ],
+ vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("d", 3)),
+ options: SortOptions::default(),
+ }],
+ ]
+ .into_iter(),
+ true,
+ )?;
+ let projection = Arc::new(ProjectionExec::try_new(
+ vec![
+ (Arc::new(Column::new("d", 3)), "d".to_string()),
+ (Arc::new(Column::new("e", 2)), "e".to_string()),
+ (Arc::new(Column::new("a", 0)), "a".to_string()),
+ ],
+ Arc::new(streaming_table) as _,
+ )?) as _;
+
+ let after_optimize =
+ ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+ let result = after_optimize
+ .as_any()
+ .downcast_ref::<StreamingTableExec>()
+ .unwrap();
+ assert_eq!(
+ result.partition_schema(),
+ &Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
+ ]))
+ );
+ assert_eq!(
+ result.projection().clone().unwrap().to_vec(),
+ vec![3_usize, 4, 0]
+ );
+ assert_eq!(
+ result.projected_schema(),
+ &Schema::new(vec![
+ Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
+ Field::new("a", DataType::Int32, true),
+ ])
+ );
+ assert_eq!(
+ result.projected_output_ordering().into_iter().collect_vec(),
+ vec![
+ vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("e", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 2)),
+ options: SortOptions::default(),
+ },
+ ],
+ vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("d", 0)),
+ options: SortOptions::default(),
+ }],
+ ]
+ );
+ assert!(result.is_infinite());
+
+ Ok(())
+ }
+
#[test]
fn test_projection_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs
index b0eaa2b42f..59819c6921 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -26,6 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
+use arrow_schema::Schema;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
@@ -70,9 +71,9 @@ impl StreamingTableExec {
) -> Result<Self> {
for x in partitions.iter() {
let partition_schema = x.schema();
- if !schema.contains(partition_schema) {
+ if !schema.eq(partition_schema) {
debug!(
- "target schema does not contain partition schema. \
+ "Target schema does not match with partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return plan_err!("Mismatch between schema and batches");
@@ -92,6 +93,30 @@ impl StreamingTableExec {
infinite,
})
}
+
+ pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
+ &self.partitions
+ }
+
+ pub fn partition_schema(&self) -> &SchemaRef {
+ self.partitions[0].schema()
+ }
+
+ pub fn projection(&self) -> &Option<Arc<[usize]>> {
+ &self.projection
+ }
+
+ pub fn projected_schema(&self) -> &Schema {
+ &self.projected_schema
+ }
+
+ pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
+ self.projected_output_ordering.clone()
+ }
+
+ pub fn is_infinite(&self) -> bool {
+ self.infinite
+ }
}
impl std::fmt::Debug for StreamingTableExec {