You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/28 15:02:19 UTC
[arrow-datafusion] branch master updated: add `with_new_inputs` (#4393)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new dafd95728 add `with_new_inputs` (#4393)
dafd95728 is described below
commit dafd957288371597902c6af9df3f31a31ac418a2
Author: jakevin <ja...@gmail.com>
AuthorDate: Mon Nov 28 23:02:13 2022 +0800
add `with_new_inputs` (#4393)
---
datafusion/expr/src/logical_plan/plan.rs | 9 ++++++++-
datafusion/optimizer/src/limit_push_down.rs | 15 +++++++--------
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 82e44986a..ed5711a7f 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -21,7 +21,7 @@ use crate::logical_plan::builder::validate_unique_names;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::utils::{
- exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist,
+ exprlist_to_fields, from_plan, grouping_set_expr_count, grouping_set_to_exprlist,
};
use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -349,6 +349,13 @@ impl LogicalPlan {
self.accept(&mut visitor)?;
Ok(visitor.using_columns)
}
+
+ pub fn with_new_inputs(
+ &self,
+ inputs: &[LogicalPlan],
+ ) -> Result<LogicalPlan, DataFusionError> {
+ from_plan(self, &self.expressions(), inputs)
+ }
}
/// Trait that implements the [Visitor
diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs
index 9dbda4653..28b868fd6 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -19,7 +19,6 @@
//! It will push down through projection, limits (taking the smaller limit)
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
-use datafusion_expr::utils::from_plan;
use datafusion_expr::{
logical_plan::{
Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
@@ -131,7 +130,7 @@ impl OptimizerRule for LimitPushDown {
fetch: scan.fetch.map(|x| std::cmp::min(x, limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
- from_plan(plan, &plan.expressions(), &[new_input])?
+ plan.with_new_inputs(&[new_input])?
}
LogicalPlan::Projection(projection) => {
@@ -164,7 +163,7 @@ impl OptimizerRule for LimitPushDown {
inputs: new_inputs,
schema: union.schema.clone(),
});
- from_plan(plan, &plan.expressions(), &[union])?
+ plan.with_new_inputs(&[union])?
}
LogicalPlan::CrossJoin(cross_join) => {
@@ -180,12 +179,12 @@ impl OptimizerRule for LimitPushDown {
fetch: Some(fetch + skip),
input: Arc::new(right.clone()),
});
- let new_input = LogicalPlan::CrossJoin(CrossJoin {
+ let new_cross_join = LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(new_left),
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
- from_plan(plan, &plan.expressions(), &[new_input])?
+ plan.with_new_inputs(&[new_cross_join])?
}
LogicalPlan::Join(join) => {
@@ -195,19 +194,19 @@ impl OptimizerRule for LimitPushDown {
JoinType::Right => push_down_join(join, None, Some(limit)),
_ => push_down_join(join, None, None),
};
- from_plan(plan, &plan.expressions(), &[new_join])?
+ plan.with_new_inputs(&[new_join])?
}
LogicalPlan::Sort(sort) => {
let sort_fetch = skip + fetch;
- let new_input = LogicalPlan::Sort(Sort {
+ let new_sort = LogicalPlan::Sort(Sort {
expr: sort.expr.clone(),
input: Arc::new((*sort.input).clone()),
fetch: Some(
sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch),
),
});
- from_plan(plan, &plan.expressions(), &[new_input])?
+ plan.with_new_inputs(&[new_sort])?
}
_ => plan.clone(),
};