You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/29 15:12:20 UTC
[arrow-datafusion] branch master updated: Stop optimizing queries twice (#2369)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6a69f529e Stop optimizing queries twice (#2369)
6a69f529e is described below
commit 6a69f529edb3087aeba57c9f01031a98ad06dd5d
Author: Andy Grove <ag...@apache.org>
AuthorDate: Fri Apr 29 09:12:14 2022 -0600
Stop optimizing queries twice (#2369)
---
ballista/rust/client/src/context.rs | 6 +-
ballista/rust/scheduler/src/planner.rs | 6 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 11 +--
benchmarks/src/bin/tpch.rs | 2 +-
datafusion/core/Cargo.toml | 4 +
datafusion/core/benches/sql_planner.rs | 93 ++++++++++++++++++++++
datafusion/core/src/dataframe.rs | 64 +++++++--------
datafusion/core/src/execution/context.rs | 28 +------
datafusion/core/tests/custom_sources.rs | 4 +-
datafusion/core/tests/parquet_pruning.rs | 8 +-
datafusion/core/tests/path_partition.rs | 4 +-
datafusion/core/tests/sql/projection.rs | 2 +-
datafusion/core/tests/sql/udf.rs | 4 +-
datafusion/core/tests/statistics.rs | 10 +--
14 files changed, 165 insertions(+), 81 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 5f77b369e..a413c5289 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -270,7 +270,7 @@ impl BallistaContext {
path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
- match self.read_csv(path, options).await?.to_logical_plan() {
+ match self.read_csv(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
@@ -284,7 +284,7 @@ impl BallistaContext {
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
- match self.read_parquet(path, options).await?.to_logical_plan() {
+ match self.read_parquet(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
@@ -298,7 +298,7 @@ impl BallistaContext {
path: &str,
options: AvroReadOptions<'_>,
) -> Result<()> {
- match self.read_avro(path, options).await?.to_logical_plan() {
+ match self.read_avro(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 8198c4ed2..07168adb6 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -311,7 +311,7 @@ mod test {
)
.await?;
- let plan = df.to_logical_plan();
+ let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;
@@ -425,7 +425,7 @@ order by
)
.await?;
- let plan = df.to_logical_plan();
+ let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;
@@ -573,7 +573,7 @@ order by
)
.await?;
- let plan = df.to_logical_plan();
+ let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 27fe64c4c..ce2c88ccc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -380,14 +380,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
error!("{}", msg);
tonic::Status::internal(msg)
})?,
- Query::Sql(sql) => {
- let df = df_session.sql(&sql).await.map_err(|e| {
+ Query::Sql(sql) => df_session
+ .sql(&sql)
+ .await
+ .and_then(|df| df.to_logical_plan())
+ .map_err(|e| {
let msg = format!("Error parsing SQL: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
- })?;
- df.to_logical_plan()
- }
+ })?,
};
debug!("Received plan for execution: {:?}", plan);
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1060bd2e0..ef8abb01c 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -655,7 +655,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
}
// create the physical plan
- let csv = csv.to_logical_plan();
+ let csv = csv.to_logical_plan()?;
let csv = ctx.optimize(&csv)?;
let csv = ctx.create_physical_plan(&csv).await?;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 533b38b81..a119316a8 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -122,6 +122,10 @@ name = "physical_plan"
harness = false
name = "parquet_query_sql"
+[[bench]]
+harness = false
+name = "sql_planner"
+
[[bench]]
harness = false
name = "jit"
diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs
new file mode 100644
index 000000000..ecdd59d8b
--- /dev/null
+++ b/datafusion/core/benches/sql_planner.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+extern crate arrow;
+extern crate datafusion;
+
+mod data_utils;
+use crate::criterion::Criterion;
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::datasource::MemTable;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use parking_lot::Mutex;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+
+fn plan(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
+ let rt = Runtime::new().unwrap();
+ criterion::black_box(rt.block_on(ctx.lock().sql(sql)).unwrap());
+}
+
+/// Create schema representing a large table
+pub fn create_schema(column_prefix: &str) -> Schema {
+ let fields = (0..200)
+ .map(|i| Field::new(&format!("{}{}", column_prefix, i), DataType::Int32, true))
+ .collect();
+ Schema::new(fields)
+}
+
+pub fn create_table_provider(column_prefix: &str) -> Result<Arc<MemTable>> {
+ let schema = Arc::new(create_schema(column_prefix));
+ MemTable::try_new(schema, vec![]).map(Arc::new)
+}
+
+fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
+ let ctx = SessionContext::new();
+ ctx.register_table("t1", create_table_provider("a")?)?;
+ ctx.register_table("t2", create_table_provider("b")?)?;
+ Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let ctx = create_context().unwrap();
+
+ c.bench_function("trivial join low numbered columns", |b| {
+ b.iter(|| {
+ plan(
+ ctx.clone(),
+ "SELECT t1.a2, t2.b2 \
+ FROM t1, t2 WHERE a1 = b1",
+ )
+ })
+ });
+
+ c.bench_function("trivial join high numbered columns", |b| {
+ b.iter(|| {
+ plan(
+ ctx.clone(),
+ "SELECT t1.a99, t2.b99 \
+ FROM t1, t2 WHERE a199 = b199",
+ )
+ })
+ });
+
+ c.bench_function("aggregate with join", |b| {
+ b.iter(|| {
+ plan(
+ ctx.clone(),
+ "SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123), COUNT(t2.b73) \
+ FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99",
+ )
+ })
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5a7017486..369c2ae93 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -86,8 +86,7 @@ impl DataFrame {
/// Create a physical plan
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.session_state.read().clone();
- let optimized_plan = state.optimize(&self.plan)?;
- state.create_physical_plan(&optimized_plan).await
+ state.create_physical_plan(&self.plan).await
}
/// Filter the DataFrame by column. Returns a new DataFrame only containing the
@@ -129,9 +128,9 @@ impl DataFrame {
pub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>> {
let window_func_exprs = find_window_exprs(&expr_list);
let plan = if window_func_exprs.is_empty() {
- self.to_logical_plan()
+ self.plan.clone()
} else {
- LogicalPlanBuilder::window_plan(self.to_logical_plan(), window_func_exprs)?
+ LogicalPlanBuilder::window_plan(self.plan.clone(), window_func_exprs)?
};
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
@@ -155,7 +154,7 @@ impl DataFrame {
/// # }
/// ```
pub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.filter(predicate)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -184,7 +183,7 @@ impl DataFrame {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.aggregate(group_expr, aggr_expr)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -204,7 +203,7 @@ impl DataFrame {
/// # }
/// ```
pub fn limit(&self, n: usize) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.limit(n)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -224,8 +223,8 @@ impl DataFrame {
/// # }
/// ```
pub fn union(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
- .union(dataframe.to_logical_plan())?
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
+ .union(dataframe.plan.clone())?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
}
@@ -247,7 +246,7 @@ impl DataFrame {
pub fn distinct(&self) -> Result<Arc<DataFrame>> {
Ok(Arc::new(DataFrame::new(
self.session_state.clone(),
- &LogicalPlanBuilder::from(self.to_logical_plan())
+ &LogicalPlanBuilder::from(self.plan.clone())
.distinct()?
.build()?,
)))
@@ -268,7 +267,7 @@ impl DataFrame {
/// # }
/// ```
pub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.sort(expr)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -300,9 +299,9 @@ impl DataFrame {
left_cols: &[&str],
right_cols: &[&str],
) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.join(
- &right.to_logical_plan(),
+ &right.plan.clone(),
join_type,
(left_cols.to_vec(), right_cols.to_vec()),
)?
@@ -329,7 +328,7 @@ impl DataFrame {
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.repartition(partitioning_scheme)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -470,8 +469,10 @@ impl DataFrame {
}
/// Return the logical plan represented by this DataFrame.
- pub fn to_logical_plan(&self) -> LogicalPlan {
- self.plan.clone()
+ pub fn to_logical_plan(&self) -> Result<LogicalPlan> {
+ // Optimize the plan first for better UX
+ let state = self.session_state.read().clone();
+ state.optimize(&self.plan)
}
/// Return a DataFrame with the explanation of its plan so far.
@@ -490,7 +491,7 @@ impl DataFrame {
/// # }
/// ```
pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
.explain(verbose, analyze)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
@@ -529,8 +530,8 @@ impl DataFrame {
/// # }
/// ```
pub fn intersect(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
- let left_plan = self.to_logical_plan();
- let right_plan = dataframe.to_logical_plan();
+ let left_plan = self.plan.clone();
+ let right_plan = dataframe.plan.clone();
Ok(Arc::new(DataFrame::new(
self.session_state.clone(),
&LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
@@ -551,8 +552,8 @@ impl DataFrame {
/// # }
/// ```
pub fn except(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
- let left_plan = self.to_logical_plan();
- let right_plan = dataframe.to_logical_plan();
+ let left_plan = self.plan.clone();
+ let right_plan = dataframe.plan.clone();
Ok(Arc::new(DataFrame::new(
self.session_state.clone(),
@@ -635,7 +636,8 @@ impl TableProvider for DataFrame {
self.session_state.clone(),
&limit
.map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
- .to_logical_plan(),
+ .plan
+ .clone(),
)
.create_physical_plan()
.await
@@ -663,7 +665,7 @@ mod tests {
let t = test_table().await?;
let t2 = t.select_columns(&["c1", "c2", "c11"])?;
- let plan = t2.to_logical_plan();
+ let plan = t2.plan.clone();
// build query using SQL
let sql_plan = create_plan("SELECT c1, c2, c11 FROM aggregate_test_100").await?;
@@ -679,7 +681,7 @@ mod tests {
// build plan using Table API
let t = test_table().await?;
let t2 = t.select(vec![col("c1"), col("c2"), col("c11")])?;
- let plan = t2.to_logical_plan();
+ let plan = t2.plan.clone();
// build query using SQL
let sql_plan = create_plan("SELECT c1, c2, c11 FROM aggregate_test_100").await?;
@@ -702,7 +704,7 @@ mod tests {
window_frame: None,
};
let t2 = t.select(vec![col("c1"), first_row])?;
- let plan = t2.to_logical_plan();
+ let plan = t2.plan.clone();
let sql_plan = create_plan(
"select c1, first_value(c1) over (partition by c2) from aggregate_test_100",
@@ -768,7 +770,7 @@ mod tests {
// build query using Table API
let t = test_table().await?;
let t2 = t.select_columns(&["c1", "c2", "c11"])?.limit(10)?;
- let plan = t2.to_logical_plan();
+ let plan = t2.plan.clone();
// build query using SQL
let sql_plan =
@@ -788,7 +790,7 @@ mod tests {
.select_columns(&["c1", "c2", "c11"])?
.limit(10)?
.explain(false, false)?;
- let plan = df.to_logical_plan();
+ let plan = df.plan.clone();
// build query using SQL
let sql_plan =
@@ -825,7 +827,7 @@ mod tests {
let f = df.registry();
let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
- let plan = df.to_logical_plan();
+ let plan = df.plan.clone();
// build query using SQL
let sql_plan =
@@ -852,7 +854,7 @@ mod tests {
async fn intersect() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c3"])?;
let plan = df.intersect(df.clone())?;
- let result = plan.to_logical_plan();
+ let result = plan.plan.clone();
let expected = create_plan(
"SELECT c1, c3 FROM aggregate_test_100
INTERSECT ALL SELECT c1, c3 FROM aggregate_test_100",
@@ -866,7 +868,7 @@ mod tests {
async fn except() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c3"])?;
let plan = df.except(df.clone())?;
- let result = plan.to_logical_plan();
+ let result = plan.plan.clone();
let expected = create_plan(
"SELECT c1, c3 FROM aggregate_test_100
EXCEPT ALL SELECT c1, c3 FROM aggregate_test_100",
@@ -880,7 +882,7 @@ mod tests {
async fn register_table() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c12"])?;
let ctx = SessionContext::new();
- let df_impl = Arc::new(DataFrame::new(ctx.state.clone(), &df.to_logical_plan()));
+ let df_impl = Arc::new(DataFrame::new(ctx.state.clone(), &df.plan.clone()));
// register a dataframe as a table
ctx.register_table("test_table", df_impl.clone())?;
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 33019ff4a..01a5eefa9 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -400,10 +400,7 @@ impl SessionContext {
}
}
- plan => Ok(Arc::new(DataFrame::new(
- self.state.clone(),
- &self.optimize(&plan)?,
- ))),
+ plan => Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))),
}
}
@@ -1361,7 +1358,8 @@ impl SessionState {
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = self.query_planner.clone();
- planner.create_physical_plan(logical_plan, self).await
+ let logical_plan = self.optimize(logical_plan)?;
+ planner.create_physical_plan(&logical_plan, self).await
}
}
@@ -1887,26 +1885,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn ctx_sql_should_optimize_plan() -> Result<()> {
- let ctx = SessionContext::new();
- let plan1 = ctx
- .create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
-
- let opt_plan1 = ctx.optimize(&plan1)?;
-
- let plan2 = ctx
- .sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")
- .await?;
-
- assert_eq!(
- format!("{:?}", opt_plan1),
- format!("{:?}", plan2.to_logical_plan())
- );
-
- Ok(())
- }
-
#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index 0916e966c..dbaaca206 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -204,7 +204,7 @@ async fn custom_source_dataframe() -> Result<()> {
let ctx = SessionContext::new();
let table = ctx.read_table(Arc::new(CustomTableProvider))?;
- let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+ let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
.project(vec![col("c2")])?
.build()?;
@@ -258,7 +258,7 @@ async fn optimizers_catch_all_statistics() {
.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan().unwrap())
.await
.unwrap();
diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs
index 5ee4fcca4..0d580f2d2 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -557,7 +557,13 @@ impl ContextWithParquet {
/// rows and normalized execution metrics
async fn query(&mut self, sql: &str) -> TestOutput {
println!("Planning sql {}", sql);
- let logical_plan = self.ctx.sql(sql).await.expect("planning").to_logical_plan();
+ let logical_plan = self
+ .ctx
+ .sql(sql)
+ .await
+ .expect("planning")
+ .to_logical_plan()
+ .unwrap();
self.run_test(logical_plan, sql).await
}
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 2e4000552..4297b12b6 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -339,7 +339,7 @@ async fn parquet_statistics() -> Result<()> {
.await;
//// NO PROJECTION ////
- let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan();
+ let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan()?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
assert_eq!(physical_plan.schema().fields().len(), 4);
@@ -360,7 +360,7 @@ async fn parquet_statistics() -> Result<()> {
let logical_plan = ctx
.sql("SELECT mycol, day FROM t WHERE day='28'")
.await?
- .to_logical_plan();
+ .to_logical_plan()?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
assert_eq!(physical_plan.schema().fields().len(), 2);
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index fb3919ed7..e9c6eb52e 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -159,7 +159,7 @@ async fn projection_on_table_scan() -> Result<()> {
let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
let table = ctx.table("test")?;
- let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+ let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
.project(vec![col("c2")])?
.build()?;
diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs
index 178a47d61..024bb20cb 100644
--- a/datafusion/core/tests/sql/udf.rs
+++ b/datafusion/core/tests/sql/udf.rs
@@ -82,7 +82,7 @@ async fn scalar_udf() -> Result<()> {
let t = ctx.table("t")?;
- let plan = LogicalPlanBuilder::from(t.to_logical_plan())
+ let plan = LogicalPlanBuilder::from(t.to_logical_plan()?)
.project(vec![
col("a"),
col("b"),
@@ -92,7 +92,7 @@ async fn scalar_udf() -> Result<()> {
assert_eq!(
format!("{:?}", plan),
- "Projection: #t.a, #t.b, my_add(#t.a, #t.b)\n TableScan: t projection=None"
+ "Projection: #t.a, #t.b, my_add(#t.a, #t.b)\n TableScan: t projection=Some([0, 1])"
);
let plan = ctx.optimize(&plan)?;
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index 1170dc7cb..d57c218d4 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -209,7 +209,7 @@ async fn sql_basic() -> Result<()> {
let df = ctx.sql("SELECT * from stats_table").await.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan()?)
.await
.unwrap();
@@ -230,7 +230,7 @@ async fn sql_filter() -> Result<()> {
.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan()?)
.await
.unwrap();
@@ -247,7 +247,7 @@ async fn sql_limit() -> Result<()> {
let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan()?)
.await
.unwrap();
// when the limit is smaller than the original number of lines
@@ -266,7 +266,7 @@ async fn sql_limit() -> Result<()> {
.await
.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan()?)
.await
.unwrap();
// when the limit is larger than the original number of lines, statistics remain unchanged
@@ -286,7 +286,7 @@ async fn sql_window() -> Result<()> {
.unwrap();
let physical_plan = ctx
- .create_physical_plan(&df.to_logical_plan())
+ .create_physical_plan(&df.to_logical_plan()?)
.await
.unwrap();