You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/23 18:04:07 UTC

[GitHub] [arrow] andygrove opened a new pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

andygrove opened a new pull request #8034:
URL: https://github.com/apache/arrow/pull/8034


   This PR adds the first physical optimization rule, to insert explicit MergeExec nodes into the physical plan when operators require a single partition of input (such as GlobalLimitExec, SortExec, HashAggregateExec in final mode).
   
   This removes the merging logic from the operators, making them more easily re-usable in different contexts (such as in a distributed query engine, which could provide its own planner and/or optimization rules).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r475253287



##########
File path: rust/datafusion/src/execution/physical_plan/mod.rs
##########
@@ -74,6 +85,15 @@ impl Partitioning {
     }
 }
 
+/// Distribution schemes
+#[derive(Debug, Clone)]
+pub enum Distribution {
+    /// Unspecified distribution

Review comment:
       Maybe `/// no distribution is required`?

##########
File path: rust/datafusion/src/execution/physical_plan/mod.rs
##########
@@ -50,6 +50,17 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn schema(&self) -> SchemaRef;
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
+    /// Specifies the data distribution requirements of all the children for this operator
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+    /// Get the children of this plan

Review comment:
       Suggestion for docs:
   
   ```
   /// `children` of this plan. This corresponds to all plans that this plan depends on. 
   /// This function should return an empty vector for `scans`.
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/mod.rs
##########
@@ -50,6 +50,17 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn schema(&self) -> SchemaRef;
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
+    /// Specifies the data distribution requirements of all the children for this operator
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+    /// Get the children of this plan
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
+    /// Replace the children of this execution plan
+    fn with_new_children(

Review comment:
       ```
   /// Returns a new plan where all children were replaced by new plans. 
   /// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r476553596



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -61,6 +61,55 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+        self.optimize_plan(plan, ctx_state)
+    }
+}
+
+impl DefaultPhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize_plan(child.clone(), ctx_state))
+            .collect::<Result<Vec<_>>>()?;
+
+        if children.len() == 0 {
+            // leaf node, children cannot be replaced
+            Ok(plan.clone())
+        } else {
+            match plan.required_child_distribution() {
+                Distribution::UnspecifiedDistribution => plan.with_new_children(children),
+                Distribution::SinglePartition => plan.with_new_children(

Review comment:
       I think that you get extra credits for that (stealing and giving credit) 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8034:
URL: https://github.com/apache/arrow/pull/8034


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-678805566


   @alamb @jorgecarleitao I'm pretty excited about this PR. This is a good example of how we can write optimizer rules against a trait-based plan.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r476484428



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -153,33 +202,28 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
 
                 let initial_aggr = HashAggregateExec::try_new(
+                    AggregateMode::Partial,

Review comment:
       Yes, that's a great point. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-678807425


   It looks fantastic! Super excited to see this!
   
   Gave it a quick look, and my understanding so far:
   
   * Physical nodes have requirements (`required_child_distribution`)
   * Physical nodes can fulfill requirements (`output_partitioning`)
   * The physical optimizer knows which physical nodes fulfill certain requirements (Merge fulfills `Distribution::SinglePartition` so far), and introduces them to fulfill a nodes' requirements, by using `children` to get the node's children and `with_new_children` to re-write the node with new children (that fulfill requirements).
   
   Is this a good overview of this PR (before going into the specifics)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-679426900


   Thanks for the reviews @alamb and @jorgecarleitao .. I have implemented the suggested and also improved the error handling so we return `Err` results rather than panic if the partitioning is incorrect at execution time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r476475961



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -61,6 +61,55 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+        self.optimize_plan(plan, ctx_state)
+    }
+}
+
+impl DefaultPhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize_plan(child.clone(), ctx_state))
+            .collect::<Result<Vec<_>>>()?;
+
+        if children.len() == 0 {
+            // leaf node, children cannot be replaced
+            Ok(plan.clone())
+        } else {
+            match plan.required_child_distribution() {
+                Distribution::UnspecifiedDistribution => plan.with_new_children(children),
+                Distribution::SinglePartition => plan.with_new_children(

Review comment:
       I can't take credit for the cleverness unfortunately, since I stole all of this from Apache Spark. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-678805666


   https://issues.apache.org/jira/browse/ARROW-9464


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r475537669



##########
File path: rust/datafusion/src/execution/physical_plan/mod.rs
##########
@@ -74,6 +85,15 @@ impl Partitioning {
     }
 }
 
+/// Distribution schemes
+#[derive(Debug, Clone)]
+pub enum Distribution {
+    /// Unspecified distribution

Review comment:
       I think "unspecified" is more accurate -- specifically the data is distributed around all the partitions, but the optimizer/planner doesn't know *how* it is distributed

##########
File path: rust/datafusion/src/execution/physical_plan/parquet.rs
##########
@@ -91,11 +91,26 @@ impl ExecutionPlan for ParquetExec {
         self.schema.clone()
     }
 
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        // this is a leaf node and has no children
+        vec![]
+    }
+
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(self.filenames.len())
     }
 
+    fn with_new_children(
+        &self,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(ExecutionError::General(format!(

Review comment:
       This is fine for this PR, but I think this implementation could actually check that children was empty and then return `self.clone()`.

##########
File path: rust/datafusion/src/execution/physical_plan/mod.rs
##########
@@ -50,6 +50,17 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn schema(&self) -> SchemaRef;
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
+    /// Specifies the data distribution requirements of all the children for this operator
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+    /// Get the children of this plan

Review comment:
       Or maybe
   
   ```
   /// `children` of this plan. This corresponds to all plans that provide input to this plan. 
   /// This function should return an empty vector for leaf nodes such as `scans`.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-679426900


   Thanks for the reviews @alamb and @jorgecarleitao .. I have implemented the suggested changes and have also improved the error handling so that we return `Err` results rather than panic if the partitioning is incorrect at execution time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r476449126



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -61,6 +61,55 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+        self.optimize_plan(plan, ctx_state)
+    }
+}
+
+impl DefaultPhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize_plan(child.clone(), ctx_state))
+            .collect::<Result<Vec<_>>>()?;
+
+        if children.len() == 0 {
+            // leaf node, children cannot be replaced
+            Ok(plan.clone())
+        } else {
+            match plan.required_child_distribution() {
+                Distribution::UnspecifiedDistribution => plan.with_new_children(children),
+                Distribution::SinglePartition => plan.with_new_children(

Review comment:
       This is clever -- I like it. 👍 

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -111,6 +123,17 @@ impl ExecutionPlan for HashAggregateExec {
         self.schema.clone()
     }
 
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        match &self.mode {
+            AggregateMode::Partial => Distribution::UnspecifiedDistribution,

Review comment:
       I would think the output distribution is likely the same as the input distribution if the input distribution is on the grouping keys. THe only possibly change here is if the mode is `Partial` and the input distribution is `SinglePartition` so is the output

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -121,21 +144,40 @@ impl ExecutionPlan for HashAggregateExec {
         partition: usize,
     ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         let input = self.input.execute(partition)?;
+        let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
+        let aggr_expr = self.aggr_expr.iter().map(|x| x.0.clone()).collect();
         if self.group_expr.is_empty() {
             Ok(Arc::new(Mutex::new(HashAggregateIterator::new(
                 self.schema.clone(),
-                self.aggr_expr.clone(),
+                aggr_expr,
                 input,
             ))))
         } else {
             Ok(Arc::new(Mutex::new(GroupedHashAggregateIterator::new(
                 self.schema.clone(),
-                self.group_expr.clone(),
-                self.aggr_expr.clone(),
+                group_expr,
+                aggr_expr,
                 input,
             ))))
         }
     }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {

Review comment:
       I like this pattern of matching child length

##########
File path: rust/datafusion/src/execution/physical_plan/projection.rs
##########
@@ -32,7 +32,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
 #[derive(Debug)]
 pub struct ProjectionExec {
     /// The projection expressions

Review comment:
       ```suggestion
       /// The projection expressions stored as tuples of (expression, output column name)
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -153,33 +202,28 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
 
                 let initial_aggr = HashAggregateExec::try_new(
+                    AggregateMode::Partial,

Review comment:
       An improvement for a subsequent PR would be to only skip the partial aggregate if the input distribution is ` Distribution::SinglePartition`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8034: ARROW-9464: [Rust] [DataFusion] Physical plan optimization rule to insert MergeExec when needed

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#issuecomment-678809178


   > It looks fantastic! Super excited to see this!
   > 
   > Gave it a quick look, and my understanding so far:
   > 
   >     * Physical nodes have requirements (`required_child_distribution`)
   > 
   >     * Physical nodes can fulfill requirements (`output_partitioning`)
   > 
   >     * The physical optimizer knows which physical nodes fulfill certain requirements (Merge fulfills `Distribution::SinglePartition` so far), and introduces them to fulfill a nodes' requirements, by using `children` to get the node's children and `with_new_children` to re-write the node with new children (that fulfill requirements).
   > 
   > 
   > Is this a good overview of this PR (before going into the specifics)?
   
   Yes, I think that's a good overview. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org