You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/29 15:12:20 UTC

[arrow-datafusion] branch master updated: Stop optimizing queries twice (#2369)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a69f529e Stop optimizing queries twice (#2369)
6a69f529e is described below

commit 6a69f529edb3087aeba57c9f01031a98ad06dd5d
Author: Andy Grove <ag...@apache.org>
AuthorDate: Fri Apr 29 09:12:14 2022 -0600

    Stop optimizing queries twice (#2369)
---
 ballista/rust/client/src/context.rs                |  6 +-
 ballista/rust/scheduler/src/planner.rs             |  6 +-
 .../rust/scheduler/src/scheduler_server/grpc.rs    | 11 +--
 benchmarks/src/bin/tpch.rs                         |  2 +-
 datafusion/core/Cargo.toml                         |  4 +
 datafusion/core/benches/sql_planner.rs             | 93 ++++++++++++++++++++++
 datafusion/core/src/dataframe.rs                   | 64 +++++++--------
 datafusion/core/src/execution/context.rs           | 28 +------
 datafusion/core/tests/custom_sources.rs            |  4 +-
 datafusion/core/tests/parquet_pruning.rs           |  8 +-
 datafusion/core/tests/path_partition.rs            |  4 +-
 datafusion/core/tests/sql/projection.rs            |  2 +-
 datafusion/core/tests/sql/udf.rs                   |  4 +-
 datafusion/core/tests/statistics.rs                | 10 +--
 14 files changed, 165 insertions(+), 81 deletions(-)

diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 5f77b369e..a413c5289 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -270,7 +270,7 @@ impl BallistaContext {
         path: &str,
         options: CsvReadOptions<'_>,
     ) -> Result<()> {
-        match self.read_csv(path, options).await?.to_logical_plan() {
+        match self.read_csv(path, options).await?.to_logical_plan()? {
             LogicalPlan::TableScan(TableScan { source, .. }) => {
                 self.register_table(name, source_as_provider(&source)?)
             }
@@ -284,7 +284,7 @@ impl BallistaContext {
         path: &str,
         options: ParquetReadOptions<'_>,
     ) -> Result<()> {
-        match self.read_parquet(path, options).await?.to_logical_plan() {
+        match self.read_parquet(path, options).await?.to_logical_plan()? {
             LogicalPlan::TableScan(TableScan { source, .. }) => {
                 self.register_table(name, source_as_provider(&source)?)
             }
@@ -298,7 +298,7 @@ impl BallistaContext {
         path: &str,
         options: AvroReadOptions<'_>,
     ) -> Result<()> {
-        match self.read_avro(path, options).await?.to_logical_plan() {
+        match self.read_avro(path, options).await?.to_logical_plan()? {
             LogicalPlan::TableScan(TableScan { source, .. }) => {
                 self.register_table(name, source_as_provider(&source)?)
             }
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 8198c4ed2..07168adb6 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -311,7 +311,7 @@ mod test {
             )
             .await?;
 
-        let plan = df.to_logical_plan();
+        let plan = df.to_logical_plan()?;
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan).await?;
 
@@ -425,7 +425,7 @@ order by
             )
             .await?;
 
-        let plan = df.to_logical_plan();
+        let plan = df.to_logical_plan()?;
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan).await?;
 
@@ -573,7 +573,7 @@ order by
             )
             .await?;
 
-        let plan = df.to_logical_plan();
+        let plan = df.to_logical_plan()?;
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan).await?;
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 27fe64c4c..ce2c88ccc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -380,14 +380,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                         error!("{}", msg);
                         tonic::Status::internal(msg)
                     })?,
-                Query::Sql(sql) => {
-                    let df = df_session.sql(&sql).await.map_err(|e| {
+                Query::Sql(sql) => df_session
+                    .sql(&sql)
+                    .await
+                    .and_then(|df| df.to_logical_plan())
+                    .map_err(|e| {
                         let msg = format!("Error parsing SQL: {}", e);
                         error!("{}", msg);
                         tonic::Status::internal(msg)
-                    })?;
-                    df.to_logical_plan()
-                }
+                    })?,
             };
             debug!("Received plan for execution: {:?}", plan);
 
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1060bd2e0..ef8abb01c 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -655,7 +655,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
         }
 
         // create the physical plan
-        let csv = csv.to_logical_plan();
+        let csv = csv.to_logical_plan()?;
         let csv = ctx.optimize(&csv)?;
         let csv = ctx.create_physical_plan(&csv).await?;
 
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 533b38b81..a119316a8 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -122,6 +122,10 @@ name = "physical_plan"
 harness = false
 name = "parquet_query_sql"
 
+[[bench]]
+harness = false
+name = "sql_planner"
+
 [[bench]]
 harness = false
 name = "jit"
diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs
new file mode 100644
index 000000000..ecdd59d8b
--- /dev/null
+++ b/datafusion/core/benches/sql_planner.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+extern crate arrow;
+extern crate datafusion;
+
+mod data_utils;
+use crate::criterion::Criterion;
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::datasource::MemTable;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use parking_lot::Mutex;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+
+fn plan(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
+    let rt = Runtime::new().unwrap();
+    criterion::black_box(rt.block_on(ctx.lock().sql(sql)).unwrap());
+}
+
+/// Create schema representing a large table
+pub fn create_schema(column_prefix: &str) -> Schema {
+    let fields = (0..200)
+        .map(|i| Field::new(&format!("{}{}", column_prefix, i), DataType::Int32, true))
+        .collect();
+    Schema::new(fields)
+}
+
+pub fn create_table_provider(column_prefix: &str) -> Result<Arc<MemTable>> {
+    let schema = Arc::new(create_schema(column_prefix));
+    MemTable::try_new(schema, vec![]).map(Arc::new)
+}
+
+fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
+    let ctx = SessionContext::new();
+    ctx.register_table("t1", create_table_provider("a")?)?;
+    ctx.register_table("t2", create_table_provider("b")?)?;
+    Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let ctx = create_context().unwrap();
+
+    c.bench_function("trivial join low numbered columns", |b| {
+        b.iter(|| {
+            plan(
+                ctx.clone(),
+                "SELECT t1.a2, t2.b2  \
+                 FROM t1, t2 WHERE a1 = b1",
+            )
+        })
+    });
+
+    c.bench_function("trivial join high numbered columns", |b| {
+        b.iter(|| {
+            plan(
+                ctx.clone(),
+                "SELECT t1.a99, t2.b99  \
+                 FROM t1, t2 WHERE a199 = b199",
+            )
+        })
+    });
+
+    c.bench_function("aggregate with join", |b| {
+        b.iter(|| {
+            plan(
+                ctx.clone(),
+                "SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123), COUNT(t2.b73)  \
+                 FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99",
+            )
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5a7017486..369c2ae93 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -86,8 +86,7 @@ impl DataFrame {
     /// Create a physical plan
     pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
         let state = self.session_state.read().clone();
-        let optimized_plan = state.optimize(&self.plan)?;
-        state.create_physical_plan(&optimized_plan).await
+        state.create_physical_plan(&self.plan).await
     }
 
     /// Filter the DataFrame by column. Returns a new DataFrame only containing the
@@ -129,9 +128,9 @@ impl DataFrame {
     pub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>> {
         let window_func_exprs = find_window_exprs(&expr_list);
         let plan = if window_func_exprs.is_empty() {
-            self.to_logical_plan()
+            self.plan.clone()
         } else {
-            LogicalPlanBuilder::window_plan(self.to_logical_plan(), window_func_exprs)?
+            LogicalPlanBuilder::window_plan(self.plan.clone(), window_func_exprs)?
         };
         let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
 
@@ -155,7 +154,7 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .filter(predicate)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -184,7 +183,7 @@ impl DataFrame {
         group_expr: Vec<Expr>,
         aggr_expr: Vec<Expr>,
     ) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .aggregate(group_expr, aggr_expr)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -204,7 +203,7 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn limit(&self, n: usize) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .limit(n)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -224,8 +223,8 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn union(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
-            .union(dataframe.to_logical_plan())?
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
+            .union(dataframe.plan.clone())?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
     }
@@ -247,7 +246,7 @@ impl DataFrame {
     pub fn distinct(&self) -> Result<Arc<DataFrame>> {
         Ok(Arc::new(DataFrame::new(
             self.session_state.clone(),
-            &LogicalPlanBuilder::from(self.to_logical_plan())
+            &LogicalPlanBuilder::from(self.plan.clone())
                 .distinct()?
                 .build()?,
         )))
@@ -268,7 +267,7 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .sort(expr)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -300,9 +299,9 @@ impl DataFrame {
         left_cols: &[&str],
         right_cols: &[&str],
     ) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .join(
-                &right.to_logical_plan(),
+                &right.plan.clone(),
                 join_type,
                 (left_cols.to_vec(), right_cols.to_vec()),
             )?
@@ -329,7 +328,7 @@ impl DataFrame {
         &self,
         partitioning_scheme: Partitioning,
     ) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .repartition(partitioning_scheme)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -470,8 +469,10 @@ impl DataFrame {
     }
 
     /// Return the logical plan represented by this DataFrame.
-    pub fn to_logical_plan(&self) -> LogicalPlan {
-        self.plan.clone()
+    pub fn to_logical_plan(&self) -> Result<LogicalPlan> {
+        // Optimize the plan first for better UX
+        let state = self.session_state.read().clone();
+        state.optimize(&self.plan)
     }
 
     /// Return a DataFrame with the explanation of its plan so far.
@@ -490,7 +491,7 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
             .explain(verbose, analyze)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -529,8 +530,8 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn intersect(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
-        let left_plan = self.to_logical_plan();
-        let right_plan = dataframe.to_logical_plan();
+        let left_plan = self.plan.clone();
+        let right_plan = dataframe.plan.clone();
         Ok(Arc::new(DataFrame::new(
             self.session_state.clone(),
             &LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
@@ -551,8 +552,8 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn except(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
-        let left_plan = self.to_logical_plan();
-        let right_plan = dataframe.to_logical_plan();
+        let left_plan = self.plan.clone();
+        let right_plan = dataframe.plan.clone();
 
         Ok(Arc::new(DataFrame::new(
             self.session_state.clone(),
@@ -635,7 +636,8 @@ impl TableProvider for DataFrame {
             self.session_state.clone(),
             &limit
                 .map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
-                .to_logical_plan(),
+                .plan
+                .clone(),
         )
         .create_physical_plan()
         .await
@@ -663,7 +665,7 @@ mod tests {
 
         let t = test_table().await?;
         let t2 = t.select_columns(&["c1", "c2", "c11"])?;
-        let plan = t2.to_logical_plan();
+        let plan = t2.plan.clone();
 
         // build query using SQL
         let sql_plan = create_plan("SELECT c1, c2, c11 FROM aggregate_test_100").await?;
@@ -679,7 +681,7 @@ mod tests {
         // build plan using Table API
         let t = test_table().await?;
         let t2 = t.select(vec![col("c1"), col("c2"), col("c11")])?;
-        let plan = t2.to_logical_plan();
+        let plan = t2.plan.clone();
 
         // build query using SQL
         let sql_plan = create_plan("SELECT c1, c2, c11 FROM aggregate_test_100").await?;
@@ -702,7 +704,7 @@ mod tests {
             window_frame: None,
         };
         let t2 = t.select(vec![col("c1"), first_row])?;
-        let plan = t2.to_logical_plan();
+        let plan = t2.plan.clone();
 
         let sql_plan = create_plan(
             "select c1, first_value(c1) over (partition by c2) from aggregate_test_100",
@@ -768,7 +770,7 @@ mod tests {
         // build query using Table API
         let t = test_table().await?;
         let t2 = t.select_columns(&["c1", "c2", "c11"])?.limit(10)?;
-        let plan = t2.to_logical_plan();
+        let plan = t2.plan.clone();
 
         // build query using SQL
         let sql_plan =
@@ -788,7 +790,7 @@ mod tests {
             .select_columns(&["c1", "c2", "c11"])?
             .limit(10)?
             .explain(false, false)?;
-        let plan = df.to_logical_plan();
+        let plan = df.plan.clone();
 
         // build query using SQL
         let sql_plan =
@@ -825,7 +827,7 @@ mod tests {
         let f = df.registry();
 
         let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
-        let plan = df.to_logical_plan();
+        let plan = df.plan.clone();
 
         // build query using SQL
         let sql_plan =
@@ -852,7 +854,7 @@ mod tests {
     async fn intersect() -> Result<()> {
         let df = test_table().await?.select_columns(&["c1", "c3"])?;
         let plan = df.intersect(df.clone())?;
-        let result = plan.to_logical_plan();
+        let result = plan.plan.clone();
         let expected = create_plan(
             "SELECT c1, c3 FROM aggregate_test_100
             INTERSECT ALL SELECT c1, c3 FROM aggregate_test_100",
@@ -866,7 +868,7 @@ mod tests {
     async fn except() -> Result<()> {
         let df = test_table().await?.select_columns(&["c1", "c3"])?;
         let plan = df.except(df.clone())?;
-        let result = plan.to_logical_plan();
+        let result = plan.plan.clone();
         let expected = create_plan(
             "SELECT c1, c3 FROM aggregate_test_100
             EXCEPT ALL SELECT c1, c3 FROM aggregate_test_100",
@@ -880,7 +882,7 @@ mod tests {
     async fn register_table() -> Result<()> {
         let df = test_table().await?.select_columns(&["c1", "c12"])?;
         let ctx = SessionContext::new();
-        let df_impl = Arc::new(DataFrame::new(ctx.state.clone(), &df.to_logical_plan()));
+        let df_impl = Arc::new(DataFrame::new(ctx.state.clone(), &df.plan.clone()));
 
         // register a dataframe as a table
         ctx.register_table("test_table", df_impl.clone())?;
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 33019ff4a..01a5eefa9 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -400,10 +400,7 @@ impl SessionContext {
                 }
             }
 
-            plan => Ok(Arc::new(DataFrame::new(
-                self.state.clone(),
-                &self.optimize(&plan)?,
-            ))),
+            plan => Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))),
         }
     }
 
@@ -1361,7 +1358,8 @@ impl SessionState {
         logical_plan: &LogicalPlan,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let planner = self.query_planner.clone();
-        planner.create_physical_plan(logical_plan, self).await
+        let logical_plan = self.optimize(logical_plan)?;
+        planner.create_physical_plan(&logical_plan, self).await
     }
 }
 
@@ -1887,26 +1885,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn ctx_sql_should_optimize_plan() -> Result<()> {
-        let ctx = SessionContext::new();
-        let plan1 = ctx
-            .create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
-
-        let opt_plan1 = ctx.optimize(&plan1)?;
-
-        let plan2 = ctx
-            .sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")
-            .await?;
-
-        assert_eq!(
-            format!("{:?}", opt_plan1),
-            format!("{:?}", plan2.to_logical_plan())
-        );
-
-        Ok(())
-    }
-
     #[tokio::test]
     async fn custom_query_planner() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index 0916e966c..dbaaca206 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -204,7 +204,7 @@ async fn custom_source_dataframe() -> Result<()> {
     let ctx = SessionContext::new();
 
     let table = ctx.read_table(Arc::new(CustomTableProvider))?;
-    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
         .project(vec![col("c2")])?
         .build()?;
 
@@ -258,7 +258,7 @@ async fn optimizers_catch_all_statistics() {
         .unwrap();
 
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan().unwrap())
         .await
         .unwrap();
 
diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs
index 5ee4fcca4..0d580f2d2 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -557,7 +557,13 @@ impl ContextWithParquet {
     /// rows and normalized execution metrics
     async fn query(&mut self, sql: &str) -> TestOutput {
         println!("Planning sql {}", sql);
-        let logical_plan = self.ctx.sql(sql).await.expect("planning").to_logical_plan();
+        let logical_plan = self
+            .ctx
+            .sql(sql)
+            .await
+            .expect("planning")
+            .to_logical_plan()
+            .unwrap();
         self.run_test(logical_plan, sql).await
     }
 
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 2e4000552..4297b12b6 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -339,7 +339,7 @@ async fn parquet_statistics() -> Result<()> {
     .await;
 
     //// NO PROJECTION ////
-    let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan();
+    let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan()?;
 
     let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
     assert_eq!(physical_plan.schema().fields().len(), 4);
@@ -360,7 +360,7 @@ async fn parquet_statistics() -> Result<()> {
     let logical_plan = ctx
         .sql("SELECT mycol, day FROM t WHERE day='28'")
         .await?
-        .to_logical_plan();
+        .to_logical_plan()?;
 
     let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
     assert_eq!(physical_plan.schema().fields().len(), 2);
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index fb3919ed7..e9c6eb52e 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -159,7 +159,7 @@ async fn projection_on_table_scan() -> Result<()> {
     let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
 
     let table = ctx.table("test")?;
-    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
         .project(vec![col("c2")])?
         .build()?;
 
diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs
index 178a47d61..024bb20cb 100644
--- a/datafusion/core/tests/sql/udf.rs
+++ b/datafusion/core/tests/sql/udf.rs
@@ -82,7 +82,7 @@ async fn scalar_udf() -> Result<()> {
 
     let t = ctx.table("t")?;
 
-    let plan = LogicalPlanBuilder::from(t.to_logical_plan())
+    let plan = LogicalPlanBuilder::from(t.to_logical_plan()?)
         .project(vec![
             col("a"),
             col("b"),
@@ -92,7 +92,7 @@ async fn scalar_udf() -> Result<()> {
 
     assert_eq!(
         format!("{:?}", plan),
-        "Projection: #t.a, #t.b, my_add(#t.a, #t.b)\n  TableScan: t projection=None"
+        "Projection: #t.a, #t.b, my_add(#t.a, #t.b)\n  TableScan: t projection=Some([0, 1])"
     );
 
     let plan = ctx.optimize(&plan)?;
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index 1170dc7cb..d57c218d4 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -209,7 +209,7 @@ async fn sql_basic() -> Result<()> {
     let df = ctx.sql("SELECT * from stats_table").await.unwrap();
 
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan()?)
         .await
         .unwrap();
 
@@ -230,7 +230,7 @@ async fn sql_filter() -> Result<()> {
         .unwrap();
 
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan()?)
         .await
         .unwrap();
 
@@ -247,7 +247,7 @@ async fn sql_limit() -> Result<()> {
 
     let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap();
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan()?)
         .await
         .unwrap();
     // when the limit is smaller than the original number of lines
@@ -266,7 +266,7 @@ async fn sql_limit() -> Result<()> {
         .await
         .unwrap();
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan()?)
         .await
         .unwrap();
     // when the limit is larger than the original number of lines, statistics remain unchanged
@@ -286,7 +286,7 @@ async fn sql_window() -> Result<()> {
         .unwrap();
 
     let physical_plan = ctx
-        .create_physical_plan(&df.to_logical_plan())
+        .create_physical_plan(&df.to_logical_plan()?)
         .await
         .unwrap();