You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/27 10:58:14 UTC

[arrow-datafusion] branch master updated: fix 593 (#610)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb195c  fix 593 (#610)
ffb195c is described below

commit ffb195c8634fc537127eff1a082811d28e8fcc2b
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun Jun 27 18:58:07 2021 +0800

    fix 593 (#610)
---
 .../rust/core/src/serde/logical_plan/from_proto.rs | 18 ++++----
 datafusion/src/execution/context.rs                |  4 +-
 datafusion/src/execution/dataframe_impl.rs         | 22 +++++----
 datafusion/src/logical_plan/builder.rs             | 36 +++++++--------
 datafusion/src/optimizer/constant_folding.rs       | 28 +++++------
 datafusion/src/optimizer/eliminate_limit.rs        |  6 +--
 datafusion/src/optimizer/filter_push_down.rs       | 54 +++++++++++-----------
 datafusion/src/optimizer/hash_build_probe_order.rs |  3 +-
 datafusion/src/optimizer/limit_push_down.rs        | 12 ++---
 datafusion/src/optimizer/projection_push_down.rs   | 28 +++++------
 datafusion/src/optimizer/simplify_expressions.rs   |  4 +-
 datafusion/src/optimizer/utils.rs                  |  2 +-
 datafusion/src/sql/planner.rs                      | 54 +++++++++++-----------
 datafusion/tests/custom_sources.rs                 |  2 +-
 14 files changed, 137 insertions(+), 136 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 15ee507..a1136cf 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -61,14 +61,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     .iter()
                     .map(|expr| expr.try_into())
                     .collect::<Result<Vec<_>, _>>()?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .project(x)?
                     .build()
                     .map_err(|e| e.into())
             }
             LogicalPlanType::Selection(selection) => {
                 let input: LogicalPlan = convert_box_required!(selection.input)?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .filter(
                         selection
                             .expr
@@ -86,7 +86,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     .iter()
                     .map(|expr| expr.try_into())
                     .collect::<Result<Vec<_>, _>>()?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .window(window_expr)?
                     .build()
                     .map_err(|e| e.into())
@@ -103,7 +103,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     .iter()
                     .map(|expr| expr.try_into())
                     .collect::<Result<Vec<_>, _>>()?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .aggregate(group_expr, aggr_expr)?
                     .build()
                     .map_err(|e| e.into())
@@ -172,7 +172,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     .iter()
                     .map(|expr| expr.try_into())
                     .collect::<Result<Vec<Expr>, _>>()?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .sort(sort_expr)?
                     .build()
                     .map_err(|e| e.into())
@@ -203,7 +203,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     }
                 };
 
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .repartition(partitioning_scheme)?
                     .build()
                     .map_err(|e| e.into())
@@ -233,14 +233,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
             }
             LogicalPlanType::Explain(explain) => {
                 let input: LogicalPlan = convert_box_required!(explain.input)?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .explain(explain.verbose)?
                     .build()
                     .map_err(|e| e.into())
             }
             LogicalPlanType::Limit(limit) => {
                 let input: LogicalPlan = convert_box_required!(limit.input)?;
-                LogicalPlanBuilder::from(&input)
+                LogicalPlanBuilder::from(input)
                     .limit(limit.limit as usize)?
                     .build()
                     .map_err(|e| e.into())
@@ -265,7 +265,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     protobuf::JoinType::Semi => JoinType::Semi,
                     protobuf::JoinType::Anti => JoinType::Anti,
                 };
-                LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
+                LogicalPlanBuilder::from(convert_box_required!(join.left)?)
                     .join(
                         &convert_box_required!(join.right)?,
                         join_type,
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 1652630..8ce408d 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1073,7 +1073,7 @@ mod tests {
         let ctx = create_ctx(&tmp_dir, partition_count)?;
 
         let table = ctx.table("test")?;
-        let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
+        let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
             .project(vec![col("c2")])?
             .build()?;
 
@@ -2566,7 +2566,7 @@ mod tests {
 
         let t = ctx.table("t")?;
 
-        let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(t.to_logical_plan())
             .project(vec![
                 col("a"),
                 col("b"),
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index 99eb7f0..7cf7797 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -63,7 +63,7 @@ impl DataFrame for DataFrameImpl {
 
     /// Create a projection based on arbitrary expressions
     fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .project(expr_list)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -71,7 +71,7 @@ impl DataFrame for DataFrameImpl {
 
     /// Create a filter based on a predicate expression
     fn filter(&self, predicate: Expr) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .filter(predicate)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -83,7 +83,7 @@ impl DataFrame for DataFrameImpl {
         group_expr: Vec<Expr>,
         aggr_expr: Vec<Expr>,
     ) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .aggregate(group_expr, aggr_expr)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -91,13 +91,17 @@ impl DataFrame for DataFrameImpl {
 
     /// Limit the number of rows
     fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+            .limit(n)?
+            .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
     /// Sort by specified sorting expressions
     fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+            .sort(expr)?
+            .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
@@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl {
         left_cols: &[&str],
         right_cols: &[&str],
     ) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .join(
                 &right.to_logical_plan(),
                 join_type,
@@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl {
         &self,
         partitioning_scheme: Partitioning,
     ) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .repartition(partitioning_scheme)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl {
     }
 
     fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .explain(verbose)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl {
     }
 
     fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
+        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
             .union(dataframe.to_logical_plan())?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index ced77ba..17fe663 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder {
 
 impl LogicalPlanBuilder {
     /// Create a builder from an existing plan
-    pub fn from(plan: &LogicalPlan) -> Self {
-        Self { plan: plan.clone() }
+    pub fn from(plan: LogicalPlan) -> Self {
+        Self { plan }
     }
 
     /// Create an empty relation.
     ///
     /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
     pub fn empty(produce_one_row: bool) -> Self {
-        Self::from(&LogicalPlan::EmptyRelation {
+        Self::from(LogicalPlan::EmptyRelation {
             produce_one_row,
             schema: DFSchemaRef::new(DFSchema::empty()),
         })
@@ -202,7 +202,7 @@ impl LogicalPlanBuilder {
             limit: None,
         };
 
-        Ok(Self::from(&table_scan))
+        Ok(Self::from(table_scan))
     }
 
     /// Apply a projection.
@@ -234,7 +234,7 @@ impl LogicalPlanBuilder {
 
         let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
 
-        Ok(Self::from(&LogicalPlan::Projection {
+        Ok(Self::from(LogicalPlan::Projection {
             expr: projected_expr,
             input: Arc::new(self.plan.clone()),
             schema: DFSchemaRef::new(schema),
@@ -244,7 +244,7 @@ impl LogicalPlanBuilder {
     /// Apply a filter
     pub fn filter(&self, expr: Expr) -> Result<Self> {
         let expr = normalize_col(expr, &self.plan.all_schemas())?;
-        Ok(Self::from(&LogicalPlan::Filter {
+        Ok(Self::from(LogicalPlan::Filter {
             predicate: expr,
             input: Arc::new(self.plan.clone()),
         }))
@@ -252,7 +252,7 @@ impl LogicalPlanBuilder {
 
     /// Apply a limit
     pub fn limit(&self, n: usize) -> Result<Self> {
-        Ok(Self::from(&LogicalPlan::Limit {
+        Ok(Self::from(LogicalPlan::Limit {
             n,
             input: Arc::new(self.plan.clone()),
         }))
@@ -261,7 +261,7 @@ impl LogicalPlanBuilder {
     /// Apply a sort
     pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
         let schemas = self.plan.all_schemas();
-        Ok(Self::from(&LogicalPlan::Sort {
+        Ok(Self::from(LogicalPlan::Sort {
             expr: normalize_cols(exprs, &schemas)?,
             input: Arc::new(self.plan.clone()),
         }))
@@ -269,11 +269,7 @@ impl LogicalPlanBuilder {
 
     /// Apply a union
     pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
-        Ok(Self::from(&union_with_alias(
-            self.plan.clone(),
-            plan,
-            None,
-        )?))
+        Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
     }
 
     /// Apply a join with on constraint
@@ -307,7 +303,7 @@ impl LogicalPlanBuilder {
             &JoinConstraint::On,
         )?;
 
-        Ok(Self::from(&LogicalPlan::Join {
+        Ok(Self::from(LogicalPlan::Join {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             on,
@@ -343,7 +339,7 @@ impl LogicalPlanBuilder {
             &JoinConstraint::Using,
         )?;
 
-        Ok(Self::from(&LogicalPlan::Join {
+        Ok(Self::from(LogicalPlan::Join {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             on,
@@ -356,7 +352,7 @@ impl LogicalPlanBuilder {
     /// Apply a cross join
     pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
         let schema = self.plan.schema().join(right.schema())?;
-        Ok(Self::from(&LogicalPlan::CrossJoin {
+        Ok(Self::from(LogicalPlan::CrossJoin {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             schema: DFSchemaRef::new(schema),
@@ -365,7 +361,7 @@ impl LogicalPlanBuilder {
 
     /// Repartition
     pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
-        Ok(Self::from(&LogicalPlan::Repartition {
+        Ok(Self::from(LogicalPlan::Repartition {
             input: Arc::new(self.plan.clone()),
             partitioning_scheme,
         }))
@@ -379,7 +375,7 @@ impl LogicalPlanBuilder {
         let mut window_fields: Vec<DFField> =
             exprlist_to_fields(all_expr, self.plan.schema())?;
         window_fields.extend_from_slice(self.plan.schema().fields());
-        Ok(Self::from(&LogicalPlan::Window {
+        Ok(Self::from(LogicalPlan::Window {
             input: Arc::new(self.plan.clone()),
             window_expr,
             schema: Arc::new(DFSchema::new(window_fields)?),
@@ -404,7 +400,7 @@ impl LogicalPlanBuilder {
         let aggr_schema =
             DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
 
-        Ok(Self::from(&LogicalPlan::Aggregate {
+        Ok(Self::from(LogicalPlan::Aggregate {
             input: Arc::new(self.plan.clone()),
             group_expr,
             aggr_expr,
@@ -421,7 +417,7 @@ impl LogicalPlanBuilder {
 
         let schema = LogicalPlan::explain_schema();
 
-        Ok(Self::from(&LogicalPlan::Explain {
+        Ok(Self::from(LogicalPlan::Explain {
             verbose,
             plan: Arc::new(self.plan.clone()),
             stringified_plans,
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index 956f74a..79833df 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -544,7 +544,7 @@ mod tests {
     #[test]
     fn optimize_plan_eq_expr() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("b").eq(lit(true)))?
             .filter(col("c").eq(lit(false)))?
             .project(vec![col("a")])?
@@ -563,7 +563,7 @@ mod tests {
     #[test]
     fn optimize_plan_not_eq_expr() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("b").not_eq(lit(true)))?
             .filter(col("c").not_eq(lit(false)))?
             .limit(1)?
@@ -584,7 +584,7 @@ mod tests {
     #[test]
     fn optimize_plan_and_expr() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))?
             .project(vec![col("a")])?
             .build()?;
@@ -601,7 +601,7 @@ mod tests {
     #[test]
     fn optimize_plan_or_expr() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))?
             .project(vec![col("a")])?
             .build()?;
@@ -618,7 +618,7 @@ mod tests {
     #[test]
     fn optimize_plan_not_expr() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("b").eq(lit(false)).not())?
             .project(vec![col("a")])?
             .build()?;
@@ -635,7 +635,7 @@ mod tests {
     #[test]
     fn optimize_plan_support_projection() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("d"), col("b").eq(lit(false))])?
             .build()?;
 
@@ -650,7 +650,7 @@ mod tests {
     #[test]
     fn optimize_plan_support_aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("c"), col("b")])?
             .aggregate(
                 vec![col("a"), col("c")],
@@ -691,7 +691,7 @@ mod tests {
             )))],
             fun: BuiltinScalarFunction::ToTimestamp,
         }];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -713,7 +713,7 @@ mod tests {
             )))],
             fun: BuiltinScalarFunction::ToTimestamp,
         }];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -732,7 +732,7 @@ mod tests {
             args: vec![],
             fun: BuiltinScalarFunction::ToTimestamp,
         }];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -751,7 +751,7 @@ mod tests {
             expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("0".to_string())))),
             data_type: DataType::Int32,
         }];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -770,7 +770,7 @@ mod tests {
             expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("".to_string())))),
             data_type: DataType::Int32,
         }];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -790,7 +790,7 @@ mod tests {
             fun: BuiltinScalarFunction::Now,
         }];
         let time = chrono::Utc::now();
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
@@ -823,7 +823,7 @@ mod tests {
                 "t2".to_string(),
             ),
         ];
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(proj)
             .unwrap()
             .build()
diff --git a/datafusion/src/optimizer/eliminate_limit.rs b/datafusion/src/optimizer/eliminate_limit.rs
index 4b5a634..bf3f2b3 100644
--- a/datafusion/src/optimizer/eliminate_limit.rs
+++ b/datafusion/src/optimizer/eliminate_limit.rs
@@ -88,7 +88,7 @@ mod tests {
     #[test]
     fn limit_0_root() {
         let table_scan = test_table_scan().unwrap();
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b"))])
             .unwrap()
             .limit(0)
@@ -104,12 +104,12 @@ mod tests {
     #[test]
     fn limit_0_nested() {
         let table_scan = test_table_scan().unwrap();
-        let plan1 = LogicalPlanBuilder::from(&table_scan)
+        let plan1 = LogicalPlanBuilder::from(table_scan.clone())
             .aggregate(vec![col("a")], vec![sum(col("b"))])
             .unwrap()
             .build()
             .unwrap();
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b"))])
             .unwrap()
             .limit(0)
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index e5f8dcf..7b1ff32 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -483,7 +483,7 @@ mod tests {
     #[test]
     fn filter_before_projection() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
             .filter(col("a").eq(lit(1i64)))?
             .build()?;
@@ -499,7 +499,7 @@ mod tests {
     #[test]
     fn filter_after_limit() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
             .limit(10)?
             .filter(col("a").eq(lit(1i64)))?
@@ -517,7 +517,7 @@ mod tests {
     #[test]
     fn filter_no_columns() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(lit(0i64).eq(lit(1i64)))?
             .build()?;
         let expected = "\
@@ -530,7 +530,7 @@ mod tests {
     #[test]
     fn filter_jump_2_plans() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b"), col("c")])?
             .project(vec![col("c"), col("b")])?
             .filter(col("a").eq(lit(1i64)))?
@@ -548,7 +548,7 @@ mod tests {
     #[test]
     fn filter_move_agg() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b")).alias("total_salary")])?
             .filter(col("a").gt(lit(10i64)))?
             .build()?;
@@ -564,7 +564,7 @@ mod tests {
     #[test]
     fn filter_keep_agg() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])?
             .filter(col("b").gt(lit(10i64)))?
             .build()?;
@@ -581,7 +581,7 @@ mod tests {
     #[test]
     fn alias() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a").alias("b"), col("c")])?
             .filter(col("b").eq(lit(1i64)))?
             .build()?;
@@ -614,7 +614,7 @@ mod tests {
     #[test]
     fn complex_expression() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![
                 add(multiply(col("a"), lit(2)), col("c")).alias("b"),
                 col("c"),
@@ -644,7 +644,7 @@ mod tests {
     #[test]
     fn complex_plan() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![
                 add(multiply(col("a"), lit(2)), col("c")).alias("b"),
                 col("c"),
@@ -680,7 +680,7 @@ mod tests {
     fn multi_filter() -> Result<()> {
         // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a").alias("b"), col("c")])?
             .aggregate(vec![col("b")], vec![sum(col("c"))])?
             .filter(col("b").gt(lit(10i64)))?
@@ -716,7 +716,7 @@ mod tests {
     fn split_filter() -> Result<()> {
         // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a").alias("b"), col("c")])?
             .aggregate(vec![col("b")], vec![sum(col("c"))])?
             .filter(and(
@@ -751,7 +751,7 @@ mod tests {
     #[test]
     fn double_limit() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
             .limit(20)?
             .limit(10)?
@@ -773,8 +773,8 @@ mod tests {
     #[test]
     fn union_all() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
-            .union(LogicalPlanBuilder::from(&table_scan).build()?)?
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .union(LogicalPlanBuilder::from(table_scan).build()?)?
             .filter(col("a").eq(lit(1i64)))?
             .build()?;
         // filter appears below Union
@@ -792,7 +792,7 @@ mod tests {
     #[test]
     fn filter_2_breaks_limits() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .filter(col("a").lt_eq(lit(1i64)))?
             .limit(1)?
@@ -828,7 +828,7 @@ mod tests {
     #[test]
     fn two_filters_on_same_depth() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .limit(1)?
             .filter(col("a").lt_eq(lit(1i64)))?
             .filter(col("a").gt_eq(lit(1i64)))?
@@ -860,7 +860,7 @@ mod tests {
     #[test]
     fn filters_user_defined_node() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("a").lt_eq(lit(1i64)))?
             .build()?;
 
@@ -882,11 +882,11 @@ mod tests {
     #[test]
     fn filter_join_on_common_independent() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let left = LogicalPlanBuilder::from(&table_scan).build()?;
-        let right = LogicalPlanBuilder::from(&table_scan)
+        let left = LogicalPlanBuilder::from(table_scan.clone()).build()?;
+        let right = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .build()?;
-        let plan = LogicalPlanBuilder::from(&left)
+        let plan = LogicalPlanBuilder::from(left)
             .join(
                 &right,
                 JoinType::Inner,
@@ -923,13 +923,13 @@ mod tests {
     #[test]
     fn filter_join_on_common_dependent() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let left = LogicalPlanBuilder::from(&table_scan)
+        let left = LogicalPlanBuilder::from(table_scan.clone())
             .project(vec![col("a"), col("c")])?
             .build()?;
-        let right = LogicalPlanBuilder::from(&table_scan)
+        let right = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
             .build()?;
-        let plan = LogicalPlanBuilder::from(&left)
+        let plan = LogicalPlanBuilder::from(left)
             .join(
                 &right,
                 JoinType::Inner,
@@ -962,13 +962,13 @@ mod tests {
     #[test]
     fn filter_join_on_one_side() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let left = LogicalPlanBuilder::from(&table_scan)
+        let left = LogicalPlanBuilder::from(table_scan.clone())
             .project(vec![col("a"), col("b")])?
             .build()?;
-        let right = LogicalPlanBuilder::from(&table_scan)
+        let right = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("c")])?
             .build()?;
-        let plan = LogicalPlanBuilder::from(&left)
+        let plan = LogicalPlanBuilder::from(left)
             .join(
                 &right,
                 JoinType::Inner,
@@ -1060,7 +1060,7 @@ mod tests {
             limit: None,
         };
 
-        LogicalPlanBuilder::from(&table_scan)
+        LogicalPlanBuilder::from(table_scan)
             .filter(col("a").eq(lit(1i64)))?
             .build()
     }
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs
index a2a99ae..ecb3b40 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -166,7 +166,8 @@ impl OptimizerRule for HashBuildProbeOrder {
                 let left = self.optimize(left, execution_props)?;
                 let right = self.optimize(right, execution_props)?;
                 if should_swap_join_order(&left, &right) {
-                    let swapped = LogicalPlanBuilder::from(&right).cross_join(&left)?;
+                    let swapped =
+                        LogicalPlanBuilder::from(right.clone()).cross_join(&left)?;
                     // wrap plan with projection to maintain column order
                     let left_cols = left
                         .schema()
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index afd9937..21b82a6 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -155,7 +155,7 @@ mod test {
     fn limit_pushdown_projection_table_provider() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .limit(1000)?
             .build()?;
@@ -174,7 +174,7 @@ mod test {
     fn limit_push_down_take_smaller_limit() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .limit(1000)?
             .limit(10)?
             .build()?;
@@ -195,7 +195,7 @@ mod test {
     fn limit_doesnt_push_down_aggregation() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![max(col("b"))])?
             .limit(1000)?
             .build()?;
@@ -214,8 +214,8 @@ mod test {
     fn limit_should_push_down_union() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
-            .union(LogicalPlanBuilder::from(&table_scan).build()?)?
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .union(LogicalPlanBuilder::from(table_scan).build()?)?
             .limit(1000)?
             .build()?;
 
@@ -236,7 +236,7 @@ mod test {
     fn multi_stage_limit_recurses_to_deeper_limit() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .limit(1000)?
             .aggregate(vec![col("a")], vec![max(col("b"))])?
             .limit(10)?
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 2cd7384..4bf2b6e 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -259,7 +259,7 @@ fn optimize_plan(
                 &mut new_required_columns,
             )?;
 
-            LogicalPlanBuilder::from(&optimize_plan(
+            LogicalPlanBuilder::from(optimize_plan(
                 optimizer,
                 input,
                 &new_required_columns,
@@ -452,7 +452,7 @@ mod tests {
     fn aggregate_no_group_by() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![], vec![max(col("b"))])?
             .build()?;
 
@@ -468,7 +468,7 @@ mod tests {
     fn aggregate_group_by() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("c")], vec![max(col("b"))])?
             .build()?;
 
@@ -484,7 +484,7 @@ mod tests {
     fn aggregate_no_group_by_with_filter() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .filter(col("c"))?
             .aggregate(vec![], vec![max(col("b"))])?
             .build()?;
@@ -506,7 +506,7 @@ mod tests {
         let table2_scan =
             LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .join(&table2_scan, JoinType::Left, vec!["a"], vec!["c1"])?
             .project(vec![col("a"), col("b"), col("c1")])?
             .build()?;
@@ -539,7 +539,7 @@ mod tests {
     fn cast() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let projection = LogicalPlanBuilder::from(&table_scan)
+        let projection = LogicalPlanBuilder::from(table_scan)
             .project(vec![Expr::Cast {
                 expr: Box::new(col("c")),
                 data_type: DataType::Float64,
@@ -560,7 +560,7 @@ mod tests {
         assert_eq!(3, table_scan.schema().fields().len());
         assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
             .build()?;
 
@@ -609,7 +609,7 @@ mod tests {
         assert_eq!(3, table_scan.schema().fields().len());
         assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("c"), col("a")])?
             .limit(5)?
             .build()?;
@@ -628,7 +628,7 @@ mod tests {
     #[test]
     fn table_scan_without_projection() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan).build()?;
+        let plan = LogicalPlanBuilder::from(table_scan).build()?;
         // should expand projection to all columns without projection
         let expected = "TableScan: test projection=Some([0, 1, 2])";
         assert_optimized_plan_eq(&plan, expected);
@@ -638,7 +638,7 @@ mod tests {
     #[test]
     fn table_scan_with_literal_projection() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![lit(1_i64), lit(2_i64)])?
             .build()?;
         let expected = "Projection: Int64(1), Int64(2)\
@@ -655,7 +655,7 @@ mod tests {
         assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
 
         // we never use "b" in the first projection => remove it
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("c"), col("a"), col("b")])?
             .filter(col("c").gt(lit(1)))?
             .aggregate(vec![col("c")], vec![max(col("a"))])?
@@ -682,7 +682,7 @@ mod tests {
         assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
 
         // there is no need for the first projection
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("b")])?
             .project(vec![lit(1).alias("a")])?
             .build()?;
@@ -703,7 +703,7 @@ mod tests {
     fn test_double_optimization() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("b")])?
             .project(vec![lit(1).alias("a")])?
             .build()?;
@@ -726,7 +726,7 @@ mod tests {
         assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
 
         // we never use "min(b)" => remove it
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
             .filter(col("c").gt(lit(1)))?
             .project(vec![col("c"), col("a"), col("MAX(test.b)")])?
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index 4253d2f..0e65de0 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -502,7 +502,7 @@ mod tests {
     #[test]
     fn test_simplify_optimized_plan() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))?
             .build()?;
@@ -521,7 +521,7 @@ mod tests {
     #[test]
     fn test_simplify_optimized_plan_with_composed_and() -> Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(&table_scan)
+        let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .filter(and(
                 and(col("a").gt(lit(5)), col("b").lt(lit(6))),
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 76f44b8..394308f 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -232,7 +232,7 @@ pub fn from_plan(
             })
         }
         LogicalPlan::CrossJoin { .. } => {
-            let left = &inputs[0];
+            let left = inputs[0].clone();
             let right = &inputs[1];
             LogicalPlanBuilder::from(left).cross_join(right)?.build()
         }
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 0691ce6..a2ff456 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -143,9 +143,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
         let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
 
-        let plan = self.order_by(&plan, &query.order_by)?;
+        let plan = self.order_by(plan, &query.order_by)?;
 
-        self.limit(&plan, &query.limit)
+        self.limit(plan, &query.limit)
     }
 
     fn set_expr_to_plan(
@@ -309,9 +309,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         match t.joins.len() {
             0 => Ok(left),
             n => {
-                let mut left = self.parse_relation_join(&left, &t.joins[0], ctes)?;
+                let mut left = self.parse_relation_join(left, &t.joins[0], ctes)?;
                 for i in 1..n {
-                    left = self.parse_relation_join(&left, &t.joins[i], ctes)?;
+                    left = self.parse_relation_join(left, &t.joins[i], ctes)?;
                 }
                 Ok(left)
             }
@@ -320,7 +320,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn parse_relation_join(
         &self,
-        left: &LogicalPlan,
+        left: LogicalPlan,
         join: &Join,
         ctes: &mut HashMap<String, LogicalPlan>,
     ) -> Result<LogicalPlan> {
@@ -347,7 +347,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
     fn parse_cross_join(
         &self,
-        left: &LogicalPlan,
+        left: LogicalPlan,
         right: &LogicalPlan,
     ) -> Result<LogicalPlan> {
         LogicalPlanBuilder::from(left).cross_join(right)?.build()
@@ -355,7 +355,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn parse_join(
         &self,
-        left: &LogicalPlan,
+        left: LogicalPlan,
         right: &LogicalPlan,
         constraint: &JoinConstraint,
         join_type: JoinType,
@@ -489,13 +489,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     }
                     if join_keys.is_empty() {
                         left =
-                            LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
+                            LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
                     } else {
                         let left_keys: Vec<Column> =
                             join_keys.iter().map(|(l, _)| l.clone()).collect();
                         let right_keys: Vec<Column> =
                             join_keys.iter().map(|(_, r)| r.clone()).collect();
-                        let builder = LogicalPlanBuilder::from(&left);
+                        let builder = LogicalPlanBuilder::from(left);
                         left = builder
                             .join(right, JoinType::Inner, left_keys, right_keys)?
                             .build()?;
@@ -507,7 +507,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // remove join expressions from filter
                 match remove_join_expressions(&filter_expr, &all_join_keys)? {
                     Some(filter_expr) => {
-                        LogicalPlanBuilder::from(&left).filter(filter_expr)?.build()
+                        LogicalPlanBuilder::from(left).filter(filter_expr)?.build()
                     }
                     _ => Ok(left),
                 }
@@ -519,7 +519,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     let mut left = plans[0].clone();
                     for right in plans.iter().skip(1) {
                         left =
-                            LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
+                            LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
                     }
                     Ok(left)
                 }
@@ -531,7 +531,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let select_exprs = self.prepare_select_exprs(&plan, &select.projection)?;
 
         // having and group by clause may reference aliases defined in select projection
-        let projected_plan = self.project(&plan, select_exprs.clone())?;
+        let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
         let mut combined_schema = (**projected_plan.schema()).clone();
         combined_schema.merge(plan.schema());
 
@@ -597,7 +597,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             || !aggr_exprs.is_empty()
         {
             self.aggregate(
-                &plan,
+                plan,
                 &select_exprs,
                 &having_expr_opt,
                 group_by_exprs,
@@ -625,7 +625,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         };
 
         let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt {
-            LogicalPlanBuilder::from(&plan)
+            LogicalPlanBuilder::from(plan)
                 .filter(having_expr_post_aggr)?
                 .build()?
         } else {
@@ -642,14 +642,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         };
 
         let plan = if select.distinct {
-            return LogicalPlanBuilder::from(&plan)
+            return LogicalPlanBuilder::from(plan)
                 .aggregate(select_exprs_post_aggr, vec![])?
                 .build();
         } else {
             plan
         };
 
-        self.project(&plan, select_exprs_post_aggr)
+        self.project(plan, select_exprs_post_aggr)
     }
 
     /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
@@ -672,7 +672,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// Wrap a plan in a projection
-    fn project(&self, input: &LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
+    fn project(&self, input: LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
         self.validate_schema_satisfies_exprs(input.schema(), &expr)?;
         LogicalPlanBuilder::from(input).project(expr)?.build()
     }
@@ -691,7 +691,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
             // the partition and sort itself is done at physical level, see physical_planner's
             // fn create_initial_plan
-            plan = LogicalPlanBuilder::from(&plan)
+            plan = LogicalPlanBuilder::from(plan)
                 .window(window_exprs)?
                 .build()?;
         }
@@ -702,7 +702,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     /// Wrap a plan in an aggregate
     fn aggregate(
         &self,
-        input: &LogicalPlan,
+        input: LogicalPlan,
         select_exprs: &[Expr],
         having_expr_opt: &Option<Expr>,
         group_by_exprs: Vec<Expr>,
@@ -714,7 +714,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .cloned()
             .collect::<Vec<Expr>>();
 
-        let plan = LogicalPlanBuilder::from(input)
+        let plan = LogicalPlanBuilder::from(input.clone())
             .aggregate(group_by_exprs, aggr_exprs)?
             .build()?;
 
@@ -722,14 +722,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // available to next phases of planning.
         let column_exprs_post_aggr = aggr_projection_exprs
             .iter()
-            .map(|expr| expr_as_column_expr(expr, input))
+            .map(|expr| expr_as_column_expr(expr, &input))
             .collect::<Result<Vec<Expr>>>()?;
 
         // Rewrite the SELECT expression to use the columns produced by the
         // aggregation.
         let select_exprs_post_aggr = select_exprs
             .iter()
-            .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
+            .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input))
             .collect::<Result<Vec<Expr>>>()?;
 
         if !can_columns_satisfy_exprs(&column_exprs_post_aggr, &select_exprs_post_aggr)? {
@@ -742,7 +742,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // aggregation.
         let having_expr_post_aggr_opt = if let Some(having_expr) = having_expr_opt {
             let having_expr_post_aggr =
-                rebase_expr(having_expr, &aggr_projection_exprs, input)?;
+                rebase_expr(having_expr, &aggr_projection_exprs, &input)?;
 
             if !can_columns_satisfy_exprs(
                 &column_exprs_post_aggr,
@@ -762,7 +762,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// Wrap a plan in a limit
-    fn limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan> {
+    fn limit(&self, input: LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan> {
         match *limit {
             Some(ref limit_expr) => {
                 let n = match self.sql_to_rex(limit_expr, input.schema())? {
@@ -774,18 +774,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 LogicalPlanBuilder::from(input).limit(n)?.build()
             }
-            _ => Ok(input.clone()),
+            _ => Ok(input),
         }
     }
 
     /// Wrap the logical in a sort
     fn order_by(
         &self,
-        plan: &LogicalPlan,
+        plan: LogicalPlan,
         order_by: &[OrderByExpr],
     ) -> Result<LogicalPlan> {
         if order_by.is_empty() {
-            return Ok(plan.clone());
+            return Ok(plan);
         }
 
         let order_by_rex = order_by
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index 75fbe8e..36adbea 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -178,7 +178,7 @@ async fn custom_source_dataframe() -> Result<()> {
     let mut ctx = ExecutionContext::new();
 
     let table = ctx.read_table(Arc::new(CustomTableProvider))?;
-    let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
+    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
         .project(vec![col("c2")])?
         .build()?;