You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 13:40:10 UTC

[arrow-datafusion] branch master updated: update planner (#751)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f9681d  update planner (#751)
6f9681d is described below

commit 6f9681d5a5657e330812404b5fb98a2df9205659
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Mon Jul 19 21:39:36 2021 +0800

    update planner (#751)
---
 datafusion/src/optimizer/limit_push_down.rs | 57 +++++++++++++++++++++++++----
 1 file changed, 49 insertions(+), 8 deletions(-)

diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index 21b82a6..37c95a4 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -17,13 +17,13 @@
 
 //! Optimizer rule to push down LIMIT in the query plan
 //! It will push down through projection, limits (taking the smaller limit)
-use std::sync::Arc;
-
 use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::LogicalPlan;
 use crate::optimizer::optimizer::OptimizerRule;
+use std::sync::Arc;
+use utils::optimize_explain;
 
 /// Optimization rule that tries pushes down LIMIT n
 /// where applicable to reduce the amount of scanned / processed data
@@ -37,16 +37,42 @@ impl LimitPushDown {
 }
 
 fn limit_push_down(
+    optimizer: &LimitPushDown,
     upper_limit: Option<usize>,
     plan: &LogicalPlan,
+    execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     match (plan, upper_limit) {
+        (
+            LogicalPlan::Explain {
+                verbose,
+                schema,
+                plan,
+                stringified_plans,
+            },
+            _,
+        ) => {
+            let schema = schema.as_ref().to_owned().into();
+            optimize_explain(
+                optimizer,
+                *verbose,
+                plan,
+                stringified_plans,
+                &schema,
+                execution_props,
+            )
+        }
         (LogicalPlan::Limit { n, input }, upper_limit) => {
             let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
             Ok(LogicalPlan::Limit {
                 n: smallest,
                 // push down limit to plan (minimum of upper limit and current limit)
-                input: Arc::new(limit_push_down(Some(smallest), input.as_ref())?),
+                input: Arc::new(limit_push_down(
+                    optimizer,
+                    Some(smallest),
+                    input.as_ref(),
+                    execution_props,
+                )?),
             })
         }
         (
@@ -80,7 +106,12 @@ fn limit_push_down(
             // Push down limit directly (projection doesn't change number of rows)
             Ok(LogicalPlan::Projection {
                 expr: expr.clone(),
-                input: Arc::new(limit_push_down(upper_limit, input.as_ref())?),
+                input: Arc::new(limit_push_down(
+                    optimizer,
+                    upper_limit,
+                    input.as_ref(),
+                    execution_props,
+                )?),
                 schema: schema.clone(),
             })
         }
@@ -98,7 +129,12 @@ fn limit_push_down(
                 .map(|x| {
                     Ok(LogicalPlan::Limit {
                         n: upper_limit,
-                        input: Arc::new(limit_push_down(Some(upper_limit), x)?),
+                        input: Arc::new(limit_push_down(
+                            optimizer,
+                            Some(upper_limit),
+                            x,
+                            execution_props,
+                        )?),
                     })
                 })
                 .collect::<Result<_>>()?;
@@ -117,7 +153,7 @@ fn limit_push_down(
             let inputs = plan.inputs();
             let new_inputs = inputs
                 .iter()
-                .map(|plan| limit_push_down(None, plan))
+                .map(|plan| limit_push_down(optimizer, None, plan, execution_props))
                 .collect::<Result<Vec<_>>>()?;
 
             utils::from_plan(plan, &expr, &new_inputs)
@@ -126,14 +162,19 @@ fn limit_push_down(
 }
 
 impl OptimizerRule for LimitPushDown {
-    fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
-        limit_push_down(None, plan)
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        limit_push_down(self, None, plan, execution_props)
     }
 
     fn name(&self) -> &str {
         "limit_push_down"
     }
 }
+
 #[cfg(test)]
 mod test {
     use super::*;