You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 13:40:10 UTC
[arrow-datafusion] branch master updated: update planner (#751)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6f9681d update planner (#751)
6f9681d is described below
commit 6f9681d5a5657e330812404b5fb98a2df9205659
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Mon Jul 19 21:39:36 2021 +0800
update planner (#751)
---
datafusion/src/optimizer/limit_push_down.rs | 57 +++++++++++++++++++++++++----
1 file changed, 49 insertions(+), 8 deletions(-)
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index 21b82a6..37c95a4 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -17,13 +17,13 @@
//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
-use std::sync::Arc;
-
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
+use std::sync::Arc;
+use utils::optimize_explain;
/// Optimization rule that tries pushes down LIMIT n
/// where applicable to reduce the amount of scanned / processed data
@@ -37,16 +37,42 @@ impl LimitPushDown {
}
fn limit_push_down(
+ optimizer: &LimitPushDown,
upper_limit: Option<usize>,
plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
+ (
+ LogicalPlan::Explain {
+ verbose,
+ schema,
+ plan,
+ stringified_plans,
+ },
+ _,
+ ) => {
+ let schema = schema.as_ref().to_owned().into();
+ optimize_explain(
+ optimizer,
+ *verbose,
+ plan,
+ stringified_plans,
+ &schema,
+ execution_props,
+ )
+ }
(LogicalPlan::Limit { n, input }, upper_limit) => {
let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
Ok(LogicalPlan::Limit {
n: smallest,
// push down limit to plan (minimum of upper limit and current limit)
- input: Arc::new(limit_push_down(Some(smallest), input.as_ref())?),
+ input: Arc::new(limit_push_down(
+ optimizer,
+ Some(smallest),
+ input.as_ref(),
+ execution_props,
+ )?),
})
}
(
@@ -80,7 +106,12 @@ fn limit_push_down(
// Push down limit directly (projection doesn't change number of rows)
Ok(LogicalPlan::Projection {
expr: expr.clone(),
- input: Arc::new(limit_push_down(upper_limit, input.as_ref())?),
+ input: Arc::new(limit_push_down(
+ optimizer,
+ upper_limit,
+ input.as_ref(),
+ execution_props,
+ )?),
schema: schema.clone(),
})
}
@@ -98,7 +129,12 @@ fn limit_push_down(
.map(|x| {
Ok(LogicalPlan::Limit {
n: upper_limit,
- input: Arc::new(limit_push_down(Some(upper_limit), x)?),
+ input: Arc::new(limit_push_down(
+ optimizer,
+ Some(upper_limit),
+ x,
+ execution_props,
+ )?),
})
})
.collect::<Result<_>>()?;
@@ -117,7 +153,7 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| limit_push_down(None, plan))
+ .map(|plan| limit_push_down(optimizer, None, plan, execution_props))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
@@ -126,14 +162,19 @@ fn limit_push_down(
}
impl OptimizerRule for LimitPushDown {
- fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
- limit_push_down(None, plan)
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
+ limit_push_down(self, None, plan, execution_props)
}
fn name(&self) -> &str {
"limit_push_down"
}
}
+
#[cfg(test)]
mod test {
use super::*;