You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by oz...@apache.org on 2023/11/27 06:21:32 UTC

(arrow-datafusion) branch main updated: Projection Pushdown over StreamingTableExec (#8299)

This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d81c961ccb Projection Pushdown over StreamingTableExec (#8299)
d81c961ccb is described below

commit d81c961ccb301bf12fba002474c3d2092d66d032
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Mon Nov 27 09:21:26 2023 +0300

    Projection Pushdown over StreamingTableExec (#8299)
    
    * Projection above streaming table can be removed
    
    * Review
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../src/physical_optimizer/projection_pushdown.rs  | 186 +++++++++++++++++++--
 datafusion/physical-plan/src/streaming.rs          |  29 +++-
 2 files changed, 199 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 74d0de507e..c0e512ffe5 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -20,6 +20,8 @@
 //! projections one by one if the operator below is amenable to this. If a
 //! projection reaches a source, it can even dissappear from the plan entirely.
 
+use std::sync::Arc;
+
 use super::output_requirements::OutputRequirementExec;
 use super::PhysicalOptimizerRule;
 use crate::datasource::physical_plan::CsvExec;
@@ -39,7 +41,6 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::{Distribution, ExecutionPlan};
 
 use arrow_schema::SchemaRef;
-
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::JoinSide;
@@ -47,10 +48,10 @@ use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::{
     Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
 };
+use datafusion_physical_plan::streaming::StreamingTableExec;
 use datafusion_physical_plan::union::UnionExec;
 
 use itertools::Itertools;
-use std::sync::Arc;
 
 /// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to
 /// remove or swap with its child.
@@ -135,6 +136,8 @@ pub fn remove_unnecessary_projections(
             try_swapping_with_sort_merge_join(projection, sm_join)?
         } else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
             try_swapping_with_sym_hash_join(projection, sym_join)?
+        } else if let Some(ste) = input.downcast_ref::<StreamingTableExec>() {
+            try_swapping_with_streaming_table(projection, ste)?
         } else {
             // If the input plan of the projection is not one of the above, we
             // conservatively assume that pushing the projection down may hurt.
@@ -149,8 +152,8 @@ pub fn remove_unnecessary_projections(
     Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes))
 }
 
-/// Tries to swap `projection` with its input (`csv`). If possible, performs
-/// the swap and returns [`CsvExec`] as the top plan. Otherwise, returns `None`.
+/// Tries to embed `projection` to its input (`csv`). If possible, returns
+/// [`CsvExec`] as the top plan. Otherwise, returns `None`.
 fn try_swapping_with_csv(
     projection: &ProjectionExec,
     csv: &CsvExec,
@@ -174,8 +177,8 @@ fn try_swapping_with_csv(
     })
 }
 
-/// Tries to swap `projection` with its input (`memory`). If possible, performs
-/// the swap and returns [`MemoryExec`] as the top plan. Otherwise, returns `None`.
+/// Tries to embed `projection` to its input (`memory`). If possible, returns
+/// [`MemoryExec`] as the top plan. Otherwise, returns `None`.
 fn try_swapping_with_memory(
     projection: &ProjectionExec,
     memory: &MemoryExec,
@@ -197,10 +200,52 @@ fn try_swapping_with_memory(
         .transpose()
 }
 
+/// Tries to embed `projection` to its input (`streaming table`).
+/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
+/// returns `None`.
+fn try_swapping_with_streaming_table(
+    projection: &ProjectionExec,
+    streaming_table: &StreamingTableExec,
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    if !all_alias_free_columns(projection.expr()) {
+        return Ok(None);
+    }
+
+    let streaming_table_projections = streaming_table
+        .projection()
+        .as_ref()
+        .map(|i| i.as_ref().to_vec());
+    let new_projections =
+        new_projections_for_columns(projection, &streaming_table_projections);
+
+    let mut lex_orderings = vec![];
+    for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
+        let mut orderings = vec![];
+        for order in lex_ordering {
+            let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)?
+            else {
+                return Ok(None);
+            };
+            orderings.push(PhysicalSortExpr {
+                expr: new_ordering,
+                options: order.options,
+            });
+        }
+        lex_orderings.push(orderings);
+    }
+
+    StreamingTableExec::try_new(
+        streaming_table.partition_schema().clone(),
+        streaming_table.partitions().clone(),
+        Some(&new_projections),
+        lex_orderings,
+        streaming_table.is_infinite(),
+    )
+    .map(|e| Some(Arc::new(e) as _))
+}
+
 /// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
-/// Two consecutive projections can always merge into a single projection unless
-/// the [`update_expr`] function does not support one of the expression
-/// types involved in the projection.
+/// Two consecutive projections can always merge into a single projection.
 fn try_unifying_projections(
     projection: &ProjectionExec,
     child: &ProjectionExec,
@@ -779,10 +824,6 @@ fn new_projections_for_columns(
 ///    given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with
 ///    an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
 ///    `a@0`, but `b@2` results in `None` since the projection does not include `b`.
-///
-/// If the expression contains a `PhysicalExpr` variant that this function does
-/// not support, it will return `None`. An error can only be introduced if
-/// `CaseExpr::try_new` returns an error.
 fn update_expr(
     expr: &Arc<dyn PhysicalExpr>,
     projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
@@ -1102,10 +1143,11 @@ mod tests {
     use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::ExecutionPlan;
 
-    use arrow_schema::{DataType, Field, Schema, SortOptions};
+    use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
     use datafusion_common::config::ConfigOptions;
     use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
     use datafusion_execution::object_store::ObjectStoreUrl;
+    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
     use datafusion_expr::{ColumnarValue, Operator};
     use datafusion_physical_expr::expressions::{
         BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
@@ -1115,8 +1157,11 @@ mod tests {
         PhysicalSortRequirement, ScalarFunctionExpr,
     };
     use datafusion_physical_plan::joins::SymmetricHashJoinExec;
+    use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
     use datafusion_physical_plan::union::UnionExec;
 
+    use itertools::Itertools;
+
     #[test]
     fn test_update_matching_exprs() -> Result<()> {
         let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
@@ -1575,6 +1620,119 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_streaming_table_after_projection() -> Result<()> {
+        struct DummyStreamPartition {
+            schema: SchemaRef,
+        }
+        impl PartitionStream for DummyStreamPartition {
+            fn schema(&self) -> &SchemaRef {
+                &self.schema
+            }
+            fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+                unreachable!()
+            }
+        }
+
+        let streaming_table = StreamingTableExec::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int32, true),
+                Field::new("b", DataType::Int32, true),
+                Field::new("c", DataType::Int32, true),
+                Field::new("d", DataType::Int32, true),
+                Field::new("e", DataType::Int32, true),
+            ])),
+            vec![Arc::new(DummyStreamPartition {
+                schema: Arc::new(Schema::new(vec![
+                    Field::new("a", DataType::Int32, true),
+                    Field::new("b", DataType::Int32, true),
+                    Field::new("c", DataType::Int32, true),
+                    Field::new("d", DataType::Int32, true),
+                    Field::new("e", DataType::Int32, true),
+                ])),
+            }) as _],
+            Some(&vec![0_usize, 2, 4, 3]),
+            vec![
+                vec![
+                    PhysicalSortExpr {
+                        expr: Arc::new(Column::new("e", 2)),
+                        options: SortOptions::default(),
+                    },
+                    PhysicalSortExpr {
+                        expr: Arc::new(Column::new("a", 0)),
+                        options: SortOptions::default(),
+                    },
+                ],
+                vec![PhysicalSortExpr {
+                    expr: Arc::new(Column::new("d", 3)),
+                    options: SortOptions::default(),
+                }],
+            ]
+            .into_iter(),
+            true,
+        )?;
+        let projection = Arc::new(ProjectionExec::try_new(
+            vec![
+                (Arc::new(Column::new("d", 3)), "d".to_string()),
+                (Arc::new(Column::new("e", 2)), "e".to_string()),
+                (Arc::new(Column::new("a", 0)), "a".to_string()),
+            ],
+            Arc::new(streaming_table) as _,
+        )?) as _;
+
+        let after_optimize =
+            ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+        let result = after_optimize
+            .as_any()
+            .downcast_ref::<StreamingTableExec>()
+            .unwrap();
+        assert_eq!(
+            result.partition_schema(),
+            &Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int32, true),
+                Field::new("b", DataType::Int32, true),
+                Field::new("c", DataType::Int32, true),
+                Field::new("d", DataType::Int32, true),
+                Field::new("e", DataType::Int32, true),
+            ]))
+        );
+        assert_eq!(
+            result.projection().clone().unwrap().to_vec(),
+            vec![3_usize, 4, 0]
+        );
+        assert_eq!(
+            result.projected_schema(),
+            &Schema::new(vec![
+                Field::new("d", DataType::Int32, true),
+                Field::new("e", DataType::Int32, true),
+                Field::new("a", DataType::Int32, true),
+            ])
+        );
+        assert_eq!(
+            result.projected_output_ordering().into_iter().collect_vec(),
+            vec![
+                vec![
+                    PhysicalSortExpr {
+                        expr: Arc::new(Column::new("e", 1)),
+                        options: SortOptions::default(),
+                    },
+                    PhysicalSortExpr {
+                        expr: Arc::new(Column::new("a", 2)),
+                        options: SortOptions::default(),
+                    },
+                ],
+                vec![PhysicalSortExpr {
+                    expr: Arc::new(Column::new("d", 0)),
+                    options: SortOptions::default(),
+                }],
+            ]
+        );
+        assert!(result.is_infinite());
+
+        Ok(())
+    }
+
     #[test]
     fn test_projection_after_projection() -> Result<()> {
         let csv = create_simple_csv_exec();
diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs
index b0eaa2b42f..59819c6921 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -26,6 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
 use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
 
 use arrow::datatypes::SchemaRef;
+use arrow_schema::Schema;
 use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
@@ -70,9 +71,9 @@ impl StreamingTableExec {
     ) -> Result<Self> {
         for x in partitions.iter() {
             let partition_schema = x.schema();
-            if !schema.contains(partition_schema) {
+            if !schema.eq(partition_schema) {
                 debug!(
-                    "target schema does not contain partition schema. \
+                    "Target schema does not match with partition schema. \
                         Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
                 );
                 return plan_err!("Mismatch between schema and batches");
@@ -92,6 +93,30 @@ impl StreamingTableExec {
             infinite,
         })
     }
+
+    pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
+        &self.partitions
+    }
+
+    pub fn partition_schema(&self) -> &SchemaRef {
+        self.partitions[0].schema()
+    }
+
+    pub fn projection(&self) -> &Option<Arc<[usize]>> {
+        &self.projection
+    }
+
+    pub fn projected_schema(&self) -> &Schema {
+        &self.projected_schema
+    }
+
+    pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
+        self.projected_output_ordering.clone()
+    }
+
+    pub fn is_infinite(&self) -> bool {
+        self.infinite
+    }
 }
 
 impl std::fmt::Debug for StreamingTableExec {