You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/21 09:57:34 UTC
[arrow-datafusion] branch master updated: Make TableProvider.scan()
and PhysicalPlanner::create_physical_plan() async (#1013)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 299ab7d Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async (#1013)
299ab7d is described below
commit 299ab7d1c37c707fcd500d3428abbdbe4dc5399b
Author: rdettai <rd...@gmail.com>
AuthorDate: Tue Sep 21 11:57:27 2021 +0200
Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async (#1013)
* [feat] make TableProvider.scan() async
* [fix] python bindings
* [fix] phantom files
* [fix] removed parallelization
* [fix] remove async from sql after rebase
---
ballista/rust/core/src/utils.rs | 4 +-
ballista/rust/scheduler/src/lib.rs | 2 +
ballista/rust/scheduler/src/planner.rs | 205 +++---
benchmarks/src/bin/nyctaxi.rs | 2 +-
benchmarks/src/bin/tpch.rs | 14 +-
datafusion/src/datasource/avro.rs | 8 +-
datafusion/src/datasource/csv.rs | 4 +-
datafusion/src/datasource/datasource.rs | 5 +-
datafusion/src/datasource/empty.rs | 4 +-
datafusion/src/datasource/json.rs | 5 +-
datafusion/src/datasource/memory.rs | 18 +-
datafusion/src/datasource/parquet.rs | 8 +-
datafusion/src/execution/context.rs | 79 +-
datafusion/src/execution/dataframe_impl.rs | 2 +-
datafusion/src/optimizer/filter_push_down.rs | 4 +-
datafusion/src/physical_plan/mod.rs | 45 +-
datafusion/src/physical_plan/planner.rs | 1025 +++++++++++++-------------
datafusion/tests/custom_sources.rs | 10 +-
datafusion/tests/parquet_pruning.rs | 1 +
datafusion/tests/provider_filter_pushdown.rs | 3 +-
datafusion/tests/sql.rs | 26 +-
datafusion/tests/statistics.rs | 28 +-
datafusion/tests/user_defined_plan.rs | 7 +-
python/src/dataframe.rs | 31 +-
24 files changed, 822 insertions(+), 718 deletions(-)
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index c731e60..fd12eb9 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
use crate::config::BallistaConfig;
+use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
@@ -269,8 +270,9 @@ impl BallistaQueryPlanner {
}
}
+#[async_trait]
impl QueryPlanner for BallistaQueryPlanner {
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
_ctx_state: &ExecutionContextState,
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index f03d08b..47caf4c 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -418,6 +418,7 @@ impl SchedulerGrpc for SchedulerServer {
let plan = fail_job!(datafusion_ctx
.create_physical_plan(&optimized_plan)
+ .await
.map_err(|e| {
let msg = format!("Could not create physical plan: {}", e);
error!("{}", msg);
@@ -447,6 +448,7 @@ impl SchedulerGrpc for SchedulerServer {
let mut planner = DistributedPlanner::new();
let stages = fail_job!(planner
.plan_query_stages(&job_id_spawn, plan)
+ .await
.map_err(|e| {
let msg = format!("Could not plan query stages: {}", e);
error!("{}", msg);
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 2872ff9..3d5712a 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -31,6 +31,8 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use futures::future::BoxFuture;
+use futures::FutureExt;
use log::info;
type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -55,14 +57,15 @@ impl DistributedPlanner {
/// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
/// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
/// A [ShuffleWriterExec] is created whenever the partitioning changes.
- pub fn plan_query_stages(
- &mut self,
- job_id: &str,
+ pub async fn plan_query_stages<'a>(
+ &'a mut self,
+ job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Arc<ShuffleWriterExec>>> {
info!("planning query stages");
- let (new_plan, mut stages) =
- self.plan_query_stages_internal(job_id, execution_plan)?;
+ let (new_plan, mut stages) = self
+ .plan_query_stages_internal(job_id, execution_plan)
+ .await?;
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
@@ -75,91 +78,95 @@ impl DistributedPlanner {
/// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
/// This function is needed because the input execution_plan might need to be modified, but it might not hold a
/// complete query stage (its parent might also belong to the same stage)
- fn plan_query_stages_internal(
- &mut self,
- job_id: &str,
+ fn plan_query_stages_internal<'a>(
+ &'a mut self,
+ job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
- ) -> Result<PartialQueryStageResult> {
- // recurse down and replace children
- if execution_plan.children().is_empty() {
- return Ok((execution_plan, vec![]));
- }
+ ) -> BoxFuture<'a, Result<PartialQueryStageResult>> {
+ async move {
+ // recurse down and replace children
+ if execution_plan.children().is_empty() {
+ return Ok((execution_plan, vec![]));
+ }
- let mut stages = vec![];
- let mut children = vec![];
- for child in execution_plan.children() {
- let (new_child, mut child_stages) =
- self.plan_query_stages_internal(job_id, child.clone())?;
- children.push(new_child);
- stages.append(&mut child_stages);
- }
+ let mut stages = vec![];
+ let mut children = vec![];
+ for child in execution_plan.children() {
+ let (new_child, mut child_stages) = self
+ .plan_query_stages_internal(job_id, child.clone())
+ .await?;
+ children.push(new_child);
+ stages.append(&mut child_stages);
+ }
- if let Some(coalesce) = execution_plan
- .as_any()
- .downcast_ref::<CoalescePartitionsExec>()
- {
- let shuffle_writer = create_shuffle_writer(
- job_id,
- self.next_stage_id(),
- children[0].clone(),
- None,
- )?;
- let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- shuffle_writer.stage_id(),
- shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
- shuffle_writer
- .shuffle_output_partitioning()
- .map(|p| p.partition_count())
- .unwrap_or_else(|| {
- shuffle_writer.output_partitioning().partition_count()
- }),
- ));
- stages.push(shuffle_writer);
- Ok((
- coalesce.with_new_children(vec![unresolved_shuffle])?,
- stages,
- ))
- } else if let Some(repart) =
- execution_plan.as_any().downcast_ref::<RepartitionExec>()
- {
- match repart.output_partitioning() {
- Partitioning::Hash(_, _) => {
- let shuffle_writer = create_shuffle_writer(
- job_id,
- self.next_stage_id(),
- children[0].clone(),
- Some(repart.partitioning().to_owned()),
- )?;
- let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- shuffle_writer.stage_id(),
- shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
- shuffle_writer
- .shuffle_output_partitioning()
- .map(|p| p.partition_count())
- .unwrap_or_else(|| {
- shuffle_writer.output_partitioning().partition_count()
- }),
- ));
- stages.push(shuffle_writer);
- Ok((unresolved_shuffle, stages))
- }
- _ => {
- // remove any non-hash repartition from the distributed plan
- Ok((children[0].clone(), stages))
+ if let Some(coalesce) = execution_plan
+ .as_any()
+ .downcast_ref::<CoalescePartitionsExec>()
+ {
+ let shuffle_writer = create_shuffle_writer(
+ job_id,
+ self.next_stage_id(),
+ children[0].clone(),
+ None,
+ )?;
+ let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+ shuffle_writer.stage_id(),
+ shuffle_writer.schema(),
+ shuffle_writer.output_partitioning().partition_count(),
+ shuffle_writer
+ .shuffle_output_partitioning()
+ .map(|p| p.partition_count())
+ .unwrap_or_else(|| {
+ shuffle_writer.output_partitioning().partition_count()
+ }),
+ ));
+ stages.push(shuffle_writer);
+ Ok((
+ coalesce.with_new_children(vec![unresolved_shuffle])?,
+ stages,
+ ))
+ } else if let Some(repart) =
+ execution_plan.as_any().downcast_ref::<RepartitionExec>()
+ {
+ match repart.output_partitioning() {
+ Partitioning::Hash(_, _) => {
+ let shuffle_writer = create_shuffle_writer(
+ job_id,
+ self.next_stage_id(),
+ children[0].clone(),
+ Some(repart.partitioning().to_owned()),
+ )?;
+ let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+ shuffle_writer.stage_id(),
+ shuffle_writer.schema(),
+ shuffle_writer.output_partitioning().partition_count(),
+ shuffle_writer
+ .shuffle_output_partitioning()
+ .map(|p| p.partition_count())
+ .unwrap_or_else(|| {
+ shuffle_writer.output_partitioning().partition_count()
+ }),
+ ));
+ stages.push(shuffle_writer);
+ Ok((unresolved_shuffle, stages))
+ }
+ _ => {
+ // remove any non-hash repartition from the distributed plan
+ Ok((children[0].clone(), stages))
+ }
}
+ } else if let Some(window) =
+ execution_plan.as_any().downcast_ref::<WindowAggExec>()
+ {
+ Err(BallistaError::NotImplemented(format!(
+ "WindowAggExec with window {:?}",
+ window
+ )))
+ } else {
+ Ok((execution_plan.with_new_children(children)?, stages))
}
- } else if let Some(window) =
- execution_plan.as_any().downcast_ref::<WindowAggExec>()
- {
- Err(BallistaError::NotImplemented(format!(
- "WindowAggExec with window {:?}",
- window
- )))
- } else {
- Ok((execution_plan.with_new_children(children)?, stages))
}
+ .boxed()
}
/// Generate a new stage ID
@@ -262,8 +269,8 @@ mod test {
};
}
- #[test]
- fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
+ #[tokio::test]
+ async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;
// simplified form of TPC-H query 1
@@ -276,11 +283,13 @@ mod test {
let plan = df.to_logical_plan();
let plan = ctx.optimize(&plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = ctx.create_physical_plan(&plan).await?;
let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
- let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+ let stages = planner
+ .plan_query_stages(&job_uuid.to_string(), plan)
+ .await?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent().to_string());
}
@@ -345,8 +354,8 @@ mod test {
Ok(())
}
- #[test]
- fn distributed_join_plan() -> Result<(), BallistaError> {
+ #[tokio::test]
+ async fn distributed_join_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;
// simplified form of TPC-H query 12
@@ -386,11 +395,13 @@ order by
let plan = df.to_logical_plan();
let plan = ctx.optimize(&plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = ctx.create_physical_plan(&plan).await?;
let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
- let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+ let stages = planner
+ .plan_query_stages(&job_uuid.to_string(), plan)
+ .await?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent().to_string());
}
@@ -516,8 +527,8 @@ order by
Ok(())
}
- #[test]
- fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+ #[tokio::test]
+ async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;
// simplified form of TPC-H query 1
@@ -530,11 +541,13 @@ order by
let plan = df.to_logical_plan();
let plan = ctx.optimize(&plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = ctx.create_physical_plan(&plan).await?;
let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
- let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+ let stages = planner
+ .plan_query_stages(&job_uuid.to_string(), plan)
+ .await?;
let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index 9387530..a88494f 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -121,7 +121,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
if debug {
println!("Optimized logical plan:\n{:?}", plan);
}
- let physical_plan = ctx.create_physical_plan(&plan)?;
+ let physical_plan = ctx.create_physical_plan(&plan).await?;
let result = collect(physical_plan).await?;
if debug {
pretty::print_batches(&result)?;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 852e499..2e9b2ff 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -348,7 +348,7 @@ async fn execute_query(
if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
- let physical_plan = ctx.create_physical_plan(&plan)?;
+ let physical_plan = ctx.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
@@ -394,7 +394,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
// create the physical plan
let csv = csv.to_logical_plan();
let csv = ctx.optimize(&csv)?;
- let csv = ctx.create_physical_plan(&csv)?;
+ let csv = ctx.create_physical_plan(&csv).await?;
let output_path = output_root_path.join(table);
let output_path = output_path.to_str().unwrap().to_owned();
@@ -1063,7 +1063,7 @@ mod tests {
use datafusion::physical_plan::ExecutionPlan;
use std::convert::TryInto;
- fn round_trip_query(n: usize) -> Result<()> {
+ async fn round_trip_query(n: usize) -> Result<()> {
let config = ExecutionConfig::new()
.with_target_partitions(1)
.with_batch_size(10);
@@ -1110,7 +1110,7 @@ mod tests {
// test physical plan roundtrip
if env::var("TPCH_DATA").is_ok() {
- let physical_plan = ctx.create_physical_plan(&plan)?;
+ let physical_plan = ctx.create_physical_plan(&plan).await?;
let proto: protobuf::PhysicalPlanNode =
(physical_plan.clone()).try_into().unwrap();
let round_trip: Arc<dyn ExecutionPlan> = (&proto).try_into().unwrap();
@@ -1126,9 +1126,9 @@ mod tests {
macro_rules! test_round_trip {
($tn:ident, $query:expr) => {
- #[test]
- fn $tn() -> Result<()> {
- round_trip_query($query)
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_query($query).await
}
};
}
diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs
index ee0fabf..ee5cea5 100644
--- a/datafusion/src/datasource/avro.rs
+++ b/datafusion/src/datasource/avro.rs
@@ -27,6 +27,7 @@ use std::{
};
use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
use crate::{
@@ -120,6 +121,7 @@ impl AvroFile {
}
}
+#[async_trait]
impl TableProvider for AvroFile {
fn as_any(&self) -> &dyn Any {
self
@@ -129,7 +131,7 @@ impl TableProvider for AvroFile {
self.schema.clone()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
@@ -185,7 +187,7 @@ mod tests {
async fn read_small_batches() -> Result<()> {
let table = load_table("alltypes_plain.avro")?;
let projection = None;
- let exec = table.scan(&projection, 2, &[], None)?;
+ let exec = table.scan(&projection, 2, &[], None).await?;
let stream = exec.execute(0).await?;
let _ = stream
@@ -414,7 +416,7 @@ mod tests {
table: Arc<dyn TableProvider>,
projection: &Option<Vec<usize>>,
) -> Result<RecordBatch> {
- let exec = table.scan(projection, 1024, &[], None)?;
+ let exec = table.scan(projection, 1024, &[], None).await?;
let mut it = exec.execute(0).await?;
it.next()
.await
diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 971bd91..d47312e 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -34,6 +34,7 @@
//! ```
use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
use std::any::Any;
use std::io::{Read, Seek};
use std::string::String;
@@ -157,6 +158,7 @@ impl CsvFile {
}
}
+#[async_trait]
impl TableProvider for CsvFile {
fn as_any(&self) -> &dyn Any {
self
@@ -166,7 +168,7 @@ impl TableProvider for CsvFile {
self.schema.clone()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs
index 3c60255..918200f 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -20,6 +20,8 @@
use std::any::Any;
use std::sync::Arc;
+use async_trait::async_trait;
+
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
@@ -54,6 +56,7 @@ pub enum TableType {
}
/// Source table
+#[async_trait]
pub trait TableProvider: Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
@@ -68,7 +71,7 @@ pub trait TableProvider: Sync + Send {
}
/// Create an ExecutionPlan that will scan the table.
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
diff --git a/datafusion/src/datasource/empty.rs b/datafusion/src/datasource/empty.rs
index 183db76..380c5a7 100644
--- a/datafusion/src/datasource/empty.rs
+++ b/datafusion/src/datasource/empty.rs
@@ -21,6 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use arrow::datatypes::*;
+use async_trait::async_trait;
use crate::datasource::TableProvider;
use crate::error::Result;
@@ -39,6 +40,7 @@ impl EmptyTable {
}
}
+#[async_trait]
impl TableProvider for EmptyTable {
fn as_any(&self) -> &dyn Any {
self
@@ -48,7 +50,7 @@ impl TableProvider for EmptyTable {
self.schema.clone()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs
index f4e6782..1a6ec7a 100644
--- a/datafusion/src/datasource/json.rs
+++ b/datafusion/src/datasource/json.rs
@@ -36,6 +36,7 @@ use crate::{
},
};
use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
+use async_trait::async_trait;
trait SeekRead: Read + Seek {}
@@ -101,6 +102,8 @@ impl NdJsonFile {
})
}
}
+
+#[async_trait]
impl TableProvider for NdJsonFile {
fn as_any(&self) -> &dyn Any {
self
@@ -110,7 +113,7 @@ impl TableProvider for NdJsonFile {
self.schema.clone()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs
index 67b0e7b..b47e7e1 100644
--- a/datafusion/src/datasource/memory.rs
+++ b/datafusion/src/datasource/memory.rs
@@ -25,6 +25,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
@@ -66,7 +67,7 @@ impl MemTable {
output_partitions: Option<usize>,
) -> Result<Self> {
let schema = t.schema();
- let exec = t.scan(&None, batch_size, &[], None)?;
+ let exec = t.scan(&None, batch_size, &[], None).await?;
let partition_count = exec.output_partitioning().partition_count();
let tasks = (0..partition_count)
@@ -114,6 +115,7 @@ impl MemTable {
}
}
+#[async_trait]
impl TableProvider for MemTable {
fn as_any(&self) -> &dyn Any {
self
@@ -123,7 +125,7 @@ impl TableProvider for MemTable {
self.schema.clone()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
@@ -168,7 +170,7 @@ mod tests {
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
// scan with projection
- let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None)?;
+ let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None).await?;
let mut it = exec.execute(0).await?;
let batch2 = it.next().await.unwrap()?;
assert_eq!(2, batch2.schema().fields().len());
@@ -198,7 +200,7 @@ mod tests {
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
- let exec = provider.scan(&None, 1024, &[], None)?;
+ let exec = provider.scan(&None, 1024, &[], None).await?;
let mut it = exec.execute(0).await?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
@@ -207,8 +209,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_invalid_projection() -> Result<()> {
+ #[tokio::test]
+ async fn test_invalid_projection() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
@@ -228,7 +230,7 @@ mod tests {
let projection: Vec<usize> = vec![0, 4];
- match provider.scan(&Some(projection), 1024, &[], None) {
+ match provider.scan(&Some(projection), 1024, &[], None).await {
Err(DataFusionError::Internal(e)) => {
assert_eq!("\"Projection index out of range\"", format!("{:?}", e))
}
@@ -349,7 +351,7 @@ mod tests {
let provider =
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
- let exec = provider.scan(&None, 1024, &[], None)?;
+ let exec = provider.scan(&None, 1024, &[], None).await?;
let mut it = exec.execute(0).await?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index 8dc9bc5..65c9089 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -21,6 +21,7 @@ use std::any::Any;
use std::fs::File;
use std::sync::Arc;
+use async_trait::async_trait;
use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
use parquet::file::serialized_reader::SerializedFileReader;
@@ -110,6 +111,7 @@ impl ParquetTable {
}
}
+#[async_trait]
impl TableProvider for ParquetTable {
fn as_any(&self) -> &dyn Any {
self
@@ -129,7 +131,7 @@ impl TableProvider for ParquetTable {
/// Scan the file(s), using the provided projection, and return one BatchIterator per
/// partition.
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
@@ -414,7 +416,7 @@ mod tests {
async fn read_small_batches() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = None;
- let exec = table.scan(&projection, 2, &[], None)?;
+ let exec = table.scan(&projection, 2, &[], None).await?;
let stream = exec.execute(0).await?;
let _ = stream
@@ -635,7 +637,7 @@ mod tests {
table: Arc<dyn TableProvider>,
projection: &Option<Vec<usize>>,
) -> Result<RecordBatch> {
- let exec = table.scan(projection, 1024, &[], None)?;
+ let exec = table.scan(projection, 1024, &[], None).await?;
let mut it = exec.execute(0).await?;
it.next()
.await
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 6789b79..00adbb0 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -81,6 +81,7 @@ use crate::sql::{
};
use crate::variable::{VarProvider, VarType};
use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
+use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
@@ -533,17 +534,27 @@ impl ExecutionContext {
}
/// Creates a physical plan from a logical plan.
- pub fn create_physical_plan(
+ pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut state = self.state.lock().unwrap();
- state.execution_props.start_execution();
+ let (state, planner) = {
+ let mut state = self.state.lock().unwrap();
+ state.execution_props.start_execution();
+
+ // We need to clone `state` to release the lock that is not `Send`. We could
+ // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
+ // propagate async even to the `LogicalPlan` building methods.
+ // Cloning `state` here is fine as we then pass it as immutable `&state`, which
+ // means that we avoid write consistency issues as the cloned version will not
+ // be written to. As for eventual modifications that would be applied to the
+ // original state after it has been cloned, they will not be picked up by the
+ // clone but that is okay, as it is equivalent to postponing the state update
+ // by keeping the lock until the end of the function scope.
+ (state.clone(), Arc::clone(&state.config.query_planner))
+ };
- state
- .config
- .query_planner
- .create_physical_plan(logical_plan, &state)
+ planner.create_physical_plan(logical_plan, &state).await
}
/// Executes a query and writes the results to a partitioned CSV file.
@@ -676,9 +687,10 @@ impl FunctionRegistry for ExecutionContext {
}
/// A planner used to add extensions to DataFusion logical and physical plans.
+#[async_trait]
pub trait QueryPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
@@ -688,15 +700,16 @@ pub trait QueryPlanner {
/// The query planner used if no user defined planner is provided
struct DefaultQueryPlanner {}
+#[async_trait]
impl QueryPlanner for DefaultQueryPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::default();
- planner.create_physical_plan(logical_plan, ctx_state)
+ planner.create_physical_plan(logical_plan, ctx_state).await
}
}
@@ -1054,6 +1067,7 @@ mod tests {
use arrow::compute::add;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
+ use async_trait::async_trait;
use std::fs::File;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
@@ -1214,7 +1228,7 @@ mod tests {
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let results = collect_partitioned(physical_plan).await?;
@@ -1290,7 +1304,7 @@ mod tests {
\n TableScan: test projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);
- let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+ let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -1301,8 +1315,8 @@ mod tests {
Ok(())
}
- #[test]
- fn preserve_nullability_on_projection() -> Result<()> {
+ #[tokio::test]
+ async fn preserve_nullability_on_projection() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = create_ctx(&tmp_dir, 1)?;
@@ -1314,7 +1328,7 @@ mod tests {
.build()?;
let plan = ctx.optimize(&plan)?;
- let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
Ok(())
}
@@ -1366,7 +1380,7 @@ mod tests {
);
assert_eq!(format!("{:?}", optimized_plan), expected);
- let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+ let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());
@@ -2442,8 +2456,8 @@ mod tests {
Ok(())
}
- #[test]
- fn aggregate_with_alias() -> Result<()> {
+ #[tokio::test]
+ async fn aggregate_with_alias() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = create_ctx(&tmp_dir, 1)?;
@@ -2459,7 +2473,7 @@ mod tests {
let plan = ctx.optimize(&plan)?;
- let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
@@ -2873,8 +2887,8 @@ mod tests {
Ok(())
}
- #[test]
- fn send_context_to_threads() -> Result<()> {
+ #[tokio::test]
+ async fn send_context_to_threads() -> Result<()> {
// ensure ExecutionContexts can be used in a multi-threaded
// environment. Usecase is for concurrent planing.
let tmp_dir = TempDir::new()?;
@@ -2900,8 +2914,8 @@ mod tests {
Ok(())
}
- #[test]
- fn ctx_sql_should_optimize_plan() -> Result<()> {
+ #[tokio::test]
+ async fn ctx_sql_should_optimize_plan() -> Result<()> {
let mut ctx = ExecutionContext::new();
let plan1 =
ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
@@ -2977,7 +2991,7 @@ mod tests {
);
let plan = ctx.optimize(&plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = ctx.create_physical_plan(&plan).await?;
let result = collect(plan).await?;
let expected = vec![
@@ -3247,6 +3261,7 @@ mod tests {
async fn information_schema_tables_table_types() {
struct TestTable(TableType);
+ #[async_trait]
impl TableProvider for TestTable {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -3260,7 +3275,7 @@ mod tests {
unimplemented!()
}
- fn scan(
+ async fn scan(
&self,
_: &Option<Vec<usize>>,
_: usize,
@@ -3764,8 +3779,9 @@ mod tests {
struct MyPhysicalPlanner {}
+ #[async_trait]
impl PhysicalPlanner for MyPhysicalPlanner {
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
_ctx_state: &ExecutionContextState,
@@ -3788,14 +3804,17 @@ mod tests {
struct MyQueryPlanner {}
+ #[async_trait]
impl QueryPlanner for MyQueryPlanner {
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_planner = MyPhysicalPlanner {};
- physical_planner.create_physical_plan(logical_plan, ctx_state)
+ physical_planner
+ .create_physical_plan(logical_plan, ctx_state)
+ .await
}
}
@@ -3822,7 +3841,7 @@ mod tests {
) -> Result<()> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_csv(physical_plan, out_dir.to_string()).await
}
@@ -3835,7 +3854,7 @@ mod tests {
) -> Result<()> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_parquet(physical_plan, out_dir.to_string(), writer_properties)
.await
}
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index 724a3f8..cd39dd6 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -57,7 +57,7 @@ impl DataFrameImpl {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
- ctx.create_physical_plan(&plan)
+ ctx.create_physical_plan(&plan).await
}
}
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index a51fbc2..594e371 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -549,6 +549,7 @@ mod tests {
use crate::test::*;
use crate::{logical_plan::col, prelude::JoinType};
use arrow::datatypes::SchemaRef;
+ use async_trait::async_trait;
fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
@@ -1129,6 +1130,7 @@ mod tests {
pub filter_support: TableProviderFilterPushDown,
}
+ #[async_trait]
impl TableProvider for PushDownProvider {
fn schema(&self) -> SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
@@ -1140,7 +1142,7 @@ mod tests {
]))
}
- fn scan(
+ async fn scan(
&self,
_: &Option<Vec<usize>>,
_: usize,
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index d12b217..f0b5622 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -193,31 +193,34 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// use datafusion::prelude::*;
/// use datafusion::physical_plan::displayable;
///
-/// // Hard code target_partitions as it appears in the RepartitionExec output
-/// let config = ExecutionConfig::new()
-/// .with_target_partitions(3);
-/// let mut ctx = ExecutionContext::with_config(config);
+/// #[tokio::main]
+/// async fn main() {
+/// // Hard code target_partitions as it appears in the RepartitionExec output
+/// let config = ExecutionConfig::new()
+/// .with_target_partitions(3);
+/// let mut ctx = ExecutionContext::with_config(config);
///
-/// // register the a table
-/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
+/// // register the a table
+/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
///
-/// // create a plan to run a SQL query
-/// let plan = ctx
-/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
-/// .unwrap();
-/// let plan = ctx.optimize(&plan).unwrap();
-/// let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+/// // create a plan to run a SQL query
+/// let plan = ctx
+/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
+/// .unwrap();
+/// let plan = ctx.optimize(&plan).unwrap();
+/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
///
-/// // Format using display string
-/// let displayable_plan = displayable(physical_plan.as_ref());
-/// let plan_string = format!("{}", displayable_plan.indent());
+/// // Format using display string
+/// let displayable_plan = displayable(physical_plan.as_ref());
+/// let plan_string = format!("{}", displayable_plan.indent());
///
-/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
-/// \n CoalesceBatchesExec: target_batch_size=4096\
-/// \n FilterExec: a@0 < 5\
-/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
-/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true",
-/// plan_string.trim());
+/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
+/// \n CoalesceBatchesExec: target_batch_size=4096\
+/// \n FilterExec: a@0 < 5\
+/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
+/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true",
+/// plan_string.trim());
+/// }
/// ```
///
pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 55dc936..06f3a1d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -54,7 +54,10 @@ use crate::{
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::{compute::can_cast_types, datatypes::DataType};
+use async_trait::async_trait;
use expressions::col;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt, TryStreamExt};
use log::debug;
use std::sync::Arc;
@@ -182,9 +185,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
+#[async_trait]
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
@@ -243,17 +247,18 @@ impl Default for DefaultPhysicalPlanner {
}
}
+#[async_trait]
impl PhysicalPlanner for DefaultPhysicalPlanner {
/// Create a physical plan from a logical plan
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
- match self.handle_explain(logical_plan, ctx_state)? {
+ match self.handle_explain(logical_plan, ctx_state).await? {
Some(plan) => Ok(plan),
None => {
- let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+ let plan = self.create_initial_plan(logical_plan, ctx_state).await?;
self.optimize_internal(plan, ctx_state, |_, _| {})
}
}
@@ -296,95 +301,343 @@ impl DefaultPhysicalPlanner {
}
/// Create a physical plan from a logical plan
- fn create_initial_plan(
- &self,
- logical_plan: &LogicalPlan,
- ctx_state: &ExecutionContextState,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let batch_size = ctx_state.config.batch_size;
-
- match logical_plan {
- LogicalPlan::TableScan {
- source,
- projection,
- filters,
- limit,
- ..
- } => {
- // Remove all qualifiers from the scan as the provider
- // doesn't know (nor should care) how the relation was
- // referred to in the query
- let filters = unnormalize_cols(filters.iter().cloned());
- source.scan(projection, batch_size, &filters, *limit)
- }
- LogicalPlan::Window {
- input, window_expr, ..
- } => {
- if window_expr.is_empty() {
- return Err(DataFusionError::Internal(
- "Impossibly got empty window expression".to_owned(),
- ));
+ fn create_initial_plan<'a>(
+ &'a self,
+ logical_plan: &'a LogicalPlan,
+ ctx_state: &'a ExecutionContextState,
+ ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+ async move {
+ let batch_size = ctx_state.config.batch_size;
+
+ let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
+ LogicalPlan::TableScan {
+ source,
+ projection,
+ filters,
+ limit,
+ ..
+ } => {
+ // Remove all qualifiers from the scan as the provider
+ // doesn't know (nor should care) how the relation was
+ // referred to in the query
+ let filters = unnormalize_cols(filters.iter().cloned());
+ source.scan(projection, batch_size, &filters, *limit).await
}
+ LogicalPlan::Window {
+ input, window_expr, ..
+ } => {
+ if window_expr.is_empty() {
+ return Err(DataFusionError::Internal(
+ "Impossibly got empty window expression".to_owned(),
+ ));
+ }
+
+ let input_exec = self.create_initial_plan(input, ctx_state).await?;
- let input_exec = self.create_initial_plan(input, ctx_state)?;
+ // at this moment we are guaranteed by the logical planner
+ // to have all the window_expr to have equal sort key
+ let partition_keys = window_expr_common_partition_keys(window_expr)?;
- // at this moment we are guaranteed by the logical planner
- // to have all the window_expr to have equal sort key
- let partition_keys = window_expr_common_partition_keys(window_expr)?;
+ let can_repartition = !partition_keys.is_empty()
+ && ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_windows;
+
+ let input_exec = if can_repartition {
+ let partition_keys = partition_keys
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(
+ e,
+ input.schema(),
+ &input_exec.schema(),
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+ Arc::new(RepartitionExec::try_new(
+ input_exec,
+ Partitioning::Hash(
+ partition_keys,
+ ctx_state.config.target_partitions,
+ ),
+ )?)
+ } else {
+ input_exec
+ };
+
+ // add a sort phase
+ let get_sort_keys = |expr: &Expr| match expr {
+ Expr::WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ } => generate_sort_key(partition_by, order_by),
+ _ => unreachable!(),
+ };
+ let sort_keys = get_sort_keys(&window_expr[0]);
+ if window_expr.len() > 1 {
+ debug_assert!(
+ window_expr[1..]
+ .iter()
+ .all(|expr| get_sort_keys(expr) == sort_keys),
+ "all window expressions shall have the same sort keys, as guaranteed by logical planning"
+ );
+ }
- let can_repartition = !partition_keys.is_empty()
- && ctx_state.config.target_partitions > 1
- && ctx_state.config.repartition_windows;
+ let logical_input_schema = input.schema();
- let input_exec = if can_repartition {
- let partition_keys = partition_keys
+ let input_exec = if sort_keys.is_empty() {
+ input_exec
+ } else {
+ let physical_input_schema = input_exec.schema();
+ let sort_keys = sort_keys
+ .iter()
+ .map(|e| match e {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => self.create_physical_sort_expr(
+ expr,
+ logical_input_schema,
+ &physical_input_schema,
+ SortOptions {
+ descending: !*asc,
+ nulls_first: *nulls_first,
+ },
+ ctx_state,
+ ),
+ _ => unreachable!(),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Arc::new(if can_repartition {
+ SortExec::new_with_partitioning(sort_keys, input_exec, true)
+ } else {
+ SortExec::try_new(sort_keys, input_exec)?
+ })
+ };
+
+ let physical_input_schema = input_exec.schema();
+ let window_expr = window_expr
.iter()
.map(|e| {
- self.create_physical_expr(
+ self.create_window_expr(
e,
- input.schema(),
- &input_exec.schema(),
+ logical_input_schema,
+ &physical_input_schema,
ctx_state,
)
})
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
- Arc::new(RepartitionExec::try_new(
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Arc::new(WindowAggExec::try_new(
+ window_expr,
input_exec,
- Partitioning::Hash(
- partition_keys,
- ctx_state.config.target_partitions,
- ),
- )?)
- } else {
- input_exec
- };
+ physical_input_schema,
+ )?) )
+ }
+ LogicalPlan::Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ } => {
+ // Initially need to perform the aggregate and then merge the partitions
+ let input_exec = self.create_initial_plan(input, ctx_state).await?;
+ let physical_input_schema = input_exec.schema();
+ let logical_input_schema = input.as_ref().schema();
- // add a sort phase
- let get_sort_keys = |expr: &Expr| match expr {
- Expr::WindowFunction {
- ref partition_by,
- ref order_by,
- ..
- } => generate_sort_key(partition_by, order_by),
- _ => unreachable!(),
- };
- let sort_keys = get_sort_keys(&window_expr[0]);
- if window_expr.len() > 1 {
- debug_assert!(
- window_expr[1..]
+ let groups = group_expr
+ .iter()
+ .map(|e| {
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ logical_input_schema,
+ &physical_input_schema,
+ ctx_state,
+ ),
+ physical_name(e),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let aggregates = aggr_expr
+ .iter()
+ .map(|e| {
+ self.create_aggregate_expr(
+ e,
+ logical_input_schema,
+ &physical_input_schema,
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let initial_aggr = Arc::new(HashAggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ input_exec,
+ physical_input_schema.clone(),
+ )?);
+
+ // update group column indices based on partial aggregate plan evaluation
+ let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
+ .map(|i| col(&groups[i].1, &initial_aggr.schema()))
+ .collect::<Result<_>>()?;
+
+ // TODO: dictionary type not yet supported in Hash Repartition
+ let contains_dict = groups
+ .iter()
+ .flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
+ .any(|x| matches!(x, DataType::Dictionary(_, _)));
+
+ let can_repartition = !groups.is_empty()
+ && ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_aggregations
+ && !contains_dict;
+
+ let (initial_aggr, next_partition_mode): (
+ Arc<dyn ExecutionPlan>,
+ AggregateMode,
+ ) = if can_repartition {
+ // Divide partial hash aggregates into multiple partitions by hash key
+ let hash_repartition = Arc::new(RepartitionExec::try_new(
+ initial_aggr,
+ Partitioning::Hash(
+ final_group.clone(),
+ ctx_state.config.target_partitions,
+ ),
+ )?);
+ // Combine hash aggregates within the partition
+ (hash_repartition, AggregateMode::FinalPartitioned)
+ } else {
+ // construct a second aggregation, keeping the final column name equal to the
+ // first aggregation and the expressions corresponding to the respective aggregate
+ (initial_aggr, AggregateMode::Final)
+ };
+
+ Ok(Arc::new(HashAggregateExec::try_new(
+ next_partition_mode,
+ final_group
.iter()
- .all(|expr| get_sort_keys(expr) == sort_keys),
- "all window expressions shall have the same sort keys, as guaranteed by logical planning"
- );
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+ .collect(),
+ aggregates,
+ initial_aggr,
+ physical_input_schema.clone(),
+ )?) )
}
+ LogicalPlan::Projection { input, expr, .. } => {
+ let input_exec = self.create_initial_plan(input, ctx_state).await?;
+ let input_schema = input.as_ref().schema();
- let logical_input_schema = input.schema();
+ let physical_exprs = expr
+ .iter()
+ .map(|e| {
+ // For projections, SQL planner and logical plan builder may convert user
+ // provided expressions into logical Column expressions if their results
+ // are already provided from the input plans. Because we work with
+ // qualified columns in logical plane, derived columns involve operators or
+ // functions will contain qualifers as well. This will result in logical
+ // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
+ //
+ // If we run these logical columns through physical_name function, we will
+ // get physical names with column qualifiers, which violates Datafusion's
+ // field name semantics. To account for this, we need to derive the
+ // physical name from physical input instead.
+ //
+ // This depends on the invariant that logical schema field index MUST match
+ // with physical schema field index.
+ let physical_name = if let Expr::Column(col) = e {
+ match input_schema.index_of_column(col) {
+ Ok(idx) => {
+ // index physical field using logical field index
+ Ok(input_exec.schema().field(idx).name().to_string())
+ }
+ // logical column is not a derived column, safe to pass along to
+ // physical_name
+ Err(_) => physical_name(e),
+ }
+ } else {
+ physical_name(e)
+ };
- let input_exec = if sort_keys.is_empty() {
- input_exec
- } else {
- let physical_input_schema = input_exec.schema();
- let sort_keys = sort_keys
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ input_schema,
+ &input_exec.schema(),
+ ctx_state,
+ ),
+ physical_name,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Arc::new(ProjectionExec::try_new(
+ physical_exprs,
+ input_exec,
+ )?) )
+ }
+ LogicalPlan::Filter {
+ input, predicate, ..
+ } => {
+ let physical_input = self.create_initial_plan(input, ctx_state).await?;
+ let input_schema = physical_input.as_ref().schema();
+ let input_dfschema = input.as_ref().schema();
+ let runtime_expr = self.create_physical_expr(
+ predicate,
+ input_dfschema,
+ &input_schema,
+ ctx_state,
+ )?;
+ Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) )
+ }
+ LogicalPlan::Union { inputs, .. } => {
+ let physical_plans = futures::stream::iter(inputs)
+ .then(|lp| self.create_initial_plan(lp, ctx_state))
+ .try_collect::<Vec<_>>()
+ .await?;
+ Ok(Arc::new(UnionExec::new(physical_plans)) )
+ }
+ LogicalPlan::Repartition {
+ input,
+ partitioning_scheme,
+ } => {
+ let physical_input = self.create_initial_plan(input, ctx_state).await?;
+ let input_schema = physical_input.schema();
+ let input_dfschema = input.as_ref().schema();
+ let physical_partitioning = match partitioning_scheme {
+ LogicalPartitioning::RoundRobinBatch(n) => {
+ Partitioning::RoundRobinBatch(*n)
+ }
+ LogicalPartitioning::Hash(expr, n) => {
+ let runtime_expr = expr
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(
+ e,
+ input_dfschema,
+ &input_schema,
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Partitioning::Hash(runtime_expr, *n)
+ }
+ };
+ Ok(Arc::new(RepartitionExec::try_new(
+ physical_input,
+ physical_partitioning,
+ )?) )
+ }
+ LogicalPlan::Sort { expr, input, .. } => {
+ let physical_input = self.create_initial_plan(input, ctx_state).await?;
+ let input_schema = physical_input.as_ref().schema();
+ let input_dfschema = input.as_ref().schema();
+ let sort_expr = expr
.iter()
.map(|e| match e {
Expr::Sort {
@@ -393,423 +646,175 @@ impl DefaultPhysicalPlanner {
nulls_first,
} => self.create_physical_sort_expr(
expr,
- logical_input_schema,
- &physical_input_schema,
+ input_dfschema,
+ &input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
ctx_state,
),
- _ => unreachable!(),
+ _ => Err(DataFusionError::Plan(
+ "Sort only accepts sort expressions".to_string(),
+ )),
})
.collect::<Result<Vec<_>>>()?;
- Arc::new(if can_repartition {
- SortExec::new_with_partitioning(sort_keys, input_exec, true)
- } else {
- SortExec::try_new(sort_keys, input_exec)?
- })
- };
-
- let physical_input_schema = input_exec.schema();
- let window_expr = window_expr
- .iter()
- .map(|e| {
- self.create_window_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(WindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_input_schema,
- )?))
- }
- LogicalPlan::Aggregate {
- input,
- group_expr,
- aggr_expr,
- ..
- } => {
- // Initially need to perform the aggregate and then merge the partitions
- let input_exec = self.create_initial_plan(input, ctx_state)?;
- let physical_input_schema = input_exec.schema();
- let logical_input_schema = input.as_ref().schema();
-
- let groups = group_expr
- .iter()
- .map(|e| {
- tuple_err((
- self.create_physical_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
- ),
- physical_name(e),
- ))
- })
- .collect::<Result<Vec<_>>>()?;
- let aggregates = aggr_expr
- .iter()
- .map(|e| {
- self.create_aggregate_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- let initial_aggr = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- input_exec,
- physical_input_schema.clone(),
- )?);
-
- // update group column indices based on partial aggregate plan evaluation
- let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
- .map(|i| col(&groups[i].1, &initial_aggr.schema()))
- .collect::<Result<_>>()?;
-
- // TODO: dictionary type not yet supported in Hash Repartition
- let contains_dict = groups
- .iter()
- .flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
- .any(|x| matches!(x, DataType::Dictionary(_, _)));
-
- let can_repartition = !groups.is_empty()
- && ctx_state.config.target_partitions > 1
- && ctx_state.config.repartition_aggregations
- && !contains_dict;
-
- let (initial_aggr, next_partition_mode): (
- Arc<dyn ExecutionPlan>,
- AggregateMode,
- ) = if can_repartition {
- // Divide partial hash aggregates into multiple partitions by hash key
- let hash_repartition = Arc::new(RepartitionExec::try_new(
- initial_aggr,
- Partitioning::Hash(
- final_group.clone(),
- ctx_state.config.target_partitions,
- ),
- )?);
- // Combine hash aggregates within the partition
- (hash_repartition, AggregateMode::FinalPartitioned)
- } else {
- // construct a second aggregation, keeping the final column name equal to the
- // first aggregation and the expressions corresponding to the respective aggregate
- (initial_aggr, AggregateMode::Final)
- };
-
- Ok(Arc::new(HashAggregateExec::try_new(
- next_partition_mode,
- final_group
+ Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) )
+ }
+ LogicalPlan::Join {
+ left,
+ right,
+ on: keys,
+ join_type,
+ ..
+ } => {
+ let left_df_schema = left.schema();
+ let physical_left = self.create_initial_plan(left, ctx_state).await?;
+ let right_df_schema = right.schema();
+ let physical_right = self.create_initial_plan(right, ctx_state).await?;
+ let join_on = keys
.iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
- .collect(),
- aggregates,
- initial_aggr,
- physical_input_schema.clone(),
- )?))
- }
- LogicalPlan::Projection { input, expr, .. } => {
- let input_exec = self.create_initial_plan(input, ctx_state)?;
- let input_schema = input.as_ref().schema();
-
- let physical_exprs = expr
- .iter()
- .map(|e| {
- // For projections, SQL planner and logical plan builder may convert user
- // provided expressions into logical Column expressions if their results
- // are already provided from the input plans. Because we work with
- // qualified columns in logical plane, derived columns involve operators or
- // functions will contain qualifers as well. This will result in logical
- // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
- //
- // If we run these logical columns through physical_name function, we will
- // get physical names with column qualifiers, which violates Datafusion's
- // field name semantics. To account for this, we need to derive the
- // physical name from physical input instead.
- //
- // This depends on the invariant that logical schema field index MUST match
- // with physical schema field index.
- let physical_name = if let Expr::Column(col) = e {
- match input_schema.index_of_column(col) {
- Ok(idx) => {
- // index physical field using logical field index
- Ok(input_exec.schema().field(idx).name().to_string())
- }
- // logical column is not a derived column, safe to pass along to
- // physical_name
- Err(_) => physical_name(e),
- }
- } else {
- physical_name(e)
- };
-
- tuple_err((
- self.create_physical_expr(
- e,
- input_schema,
- &input_exec.schema(),
- ctx_state,
- ),
- physical_name,
- ))
- })
- .collect::<Result<Vec<_>>>()?;
+ .map(|(l, r)| {
+ Ok((
+ Column::new(&l.name, left_df_schema.index_of_column(l)?),
+ Column::new(&r.name, right_df_schema.index_of_column(r)?),
+ ))
+ })
+ .collect::<Result<join_utils::JoinOn>>()?;
- Ok(Arc::new(ProjectionExec::try_new(
- physical_exprs,
- input_exec,
- )?))
- }
- LogicalPlan::Filter {
- input, predicate, ..
- } => {
- let physical_input = self.create_initial_plan(input, ctx_state)?;
- let input_schema = physical_input.as_ref().schema();
- let input_dfschema = input.as_ref().schema();
- let runtime_expr = self.create_physical_expr(
- predicate,
- input_dfschema,
- &input_schema,
- ctx_state,
- )?;
- Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
- }
- LogicalPlan::Union { inputs, .. } => {
- let physical_plans = inputs
- .iter()
- .map(|input| self.create_initial_plan(input, ctx_state))
- .collect::<Result<Vec<_>>>()?;
- Ok(Arc::new(UnionExec::new(physical_plans)))
- }
- LogicalPlan::Repartition {
- input,
- partitioning_scheme,
- } => {
- let physical_input = self.create_initial_plan(input, ctx_state)?;
- let input_schema = physical_input.schema();
- let input_dfschema = input.as_ref().schema();
- let physical_partitioning = match partitioning_scheme {
- LogicalPartitioning::RoundRobinBatch(n) => {
- Partitioning::RoundRobinBatch(*n)
- }
- LogicalPartitioning::Hash(expr, n) => {
- let runtime_expr = expr
+ if ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_joins
+ {
+ let (left_expr, right_expr) = join_on
.iter()
- .map(|e| {
- self.create_physical_expr(
- e,
- input_dfschema,
- &input_schema,
- ctx_state,
+ .map(|(l, r)| {
+ (
+ Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+ Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
)
})
- .collect::<Result<Vec<_>>>()?;
- Partitioning::Hash(runtime_expr, *n)
- }
- };
- Ok(Arc::new(RepartitionExec::try_new(
- physical_input,
- physical_partitioning,
- )?))
- }
- LogicalPlan::Sort { expr, input, .. } => {
- let physical_input = self.create_initial_plan(input, ctx_state)?;
- let input_schema = physical_input.as_ref().schema();
- let input_dfschema = input.as_ref().schema();
-
- let sort_expr = expr
- .iter()
- .map(|e| match e {
- Expr::Sort {
- expr,
- asc,
- nulls_first,
- } => self.create_physical_sort_expr(
- expr,
- input_dfschema,
- &input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
- ctx_state,
- ),
- _ => Err(DataFusionError::Plan(
- "Sort only accepts sort expressions".to_string(),
- )),
- })
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?))
- }
- LogicalPlan::Join {
- left,
- right,
- on: keys,
- join_type,
- ..
- } => {
- let left_df_schema = left.schema();
- let physical_left = self.create_initial_plan(left, ctx_state)?;
- let right_df_schema = right.schema();
- let physical_right = self.create_initial_plan(right, ctx_state)?;
- let join_on = keys
- .iter()
- .map(|(l, r)| {
- Ok((
- Column::new(&l.name, left_df_schema.index_of_column(l)?),
- Column::new(&r.name, right_df_schema.index_of_column(r)?),
- ))
- })
- .collect::<Result<join_utils::JoinOn>>()?;
-
- if ctx_state.config.target_partitions > 1
- && ctx_state.config.repartition_joins
- {
- let (left_expr, right_expr) = join_on
- .iter()
- .map(|(l, r)| {
- (
- Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
- Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
- )
- })
- .unzip();
-
- // Use hash partition by default to parallelize hash joins
- Ok(Arc::new(HashJoinExec::try_new(
- Arc::new(RepartitionExec::try_new(
+ .unzip();
+
+ // Use hash partition by default to parallelize hash joins
+ Ok(Arc::new(HashJoinExec::try_new(
+ Arc::new(RepartitionExec::try_new(
+ physical_left,
+ Partitioning::Hash(
+ left_expr,
+ ctx_state.config.target_partitions,
+ ),
+ )?),
+ Arc::new(RepartitionExec::try_new(
+ physical_right,
+ Partitioning::Hash(
+ right_expr,
+ ctx_state.config.target_partitions,
+ ),
+ )?),
+ join_on,
+ join_type,
+ PartitionMode::Partitioned,
+ )?))
+ } else {
+ Ok(Arc::new(HashJoinExec::try_new(
physical_left,
- Partitioning::Hash(
- left_expr,
- ctx_state.config.target_partitions,
- ),
- )?),
- Arc::new(RepartitionExec::try_new(
physical_right,
- Partitioning::Hash(
- right_expr,
- ctx_state.config.target_partitions,
- ),
- )?),
- join_on,
- join_type,
- PartitionMode::Partitioned,
- )?))
- } else {
- Ok(Arc::new(HashJoinExec::try_new(
- physical_left,
- physical_right,
- join_on,
- join_type,
- PartitionMode::CollectLeft,
- )?))
+ join_on,
+ join_type,
+ PartitionMode::CollectLeft,
+ )?))
+ }
}
- }
- LogicalPlan::CrossJoin { left, right, .. } => {
- let left = self.create_initial_plan(left, ctx_state)?;
- let right = self.create_initial_plan(right, ctx_state)?;
- Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
- }
- LogicalPlan::EmptyRelation {
- produce_one_row,
- schema,
- } => Ok(Arc::new(EmptyExec::new(
- *produce_one_row,
- SchemaRef::new(schema.as_ref().to_owned().into()),
- ))),
- LogicalPlan::Limit { input, n, .. } => {
- let limit = *n;
- let input = self.create_initial_plan(input, ctx_state)?;
-
- // GlobalLimitExec requires a single partition for input
- let input = if input.output_partitioning().partition_count() == 1 {
- input
- } else {
- // Apply a LocalLimitExec to each partition. The optimizer will also insert
- // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
- Arc::new(LocalLimitExec::new(input, limit))
- };
-
- Ok(Arc::new(GlobalLimitExec::new(input, limit)))
- }
- LogicalPlan::CreateExternalTable { .. } => {
- // There is no default plan for "CREATE EXTERNAL
- // TABLE" -- it must be handled at a higher level (so
- // that the appropriate table can be registered with
- // the context)
- Err(DataFusionError::Internal(
- "Unsupported logical plan: CreateExternalTable".to_string(),
- ))
- }
- LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
- "Unsupported logical plan: Explain must be root of the plan".to_string(),
- )),
- LogicalPlan::Analyze {
- verbose,
- input,
- schema,
- } => {
- let input = self.create_initial_plan(input, ctx_state)?;
- let schema = SchemaRef::new(schema.as_ref().to_owned().into());
- Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
- }
- LogicalPlan::Extension { node } => {
- let physical_inputs = node
- .inputs()
- .into_iter()
- .map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
- .collect::<Result<Vec<_>>>()?;
+ LogicalPlan::CrossJoin { left, right, .. } => {
+ let left = self.create_initial_plan(left, ctx_state).await?;
+ let right = self.create_initial_plan(right, ctx_state).await?;
+ Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+ }
+ LogicalPlan::EmptyRelation {
+ produce_one_row,
+ schema,
+ } => Ok(Arc::new(EmptyExec::new(
+ *produce_one_row,
+ SchemaRef::new(schema.as_ref().to_owned().into()),
+ ))),
+ LogicalPlan::Limit { input, n, .. } => {
+ let limit = *n;
+ let input = self.create_initial_plan(input, ctx_state).await?;
+
+ // GlobalLimitExec requires a single partition for input
+ let input = if input.output_partitioning().partition_count() == 1 {
+ input
+ } else {
+ // Apply a LocalLimitExec to each partition. The optimizer will also insert
+ // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
+ Arc::new(LocalLimitExec::new(input, limit))
+ };
- let maybe_plan = self.extension_planners.iter().try_fold(
- None,
- |maybe_plan, planner| {
- if let Some(plan) = maybe_plan {
- Ok(Some(plan))
- } else {
- planner.plan_extension(
- self,
- node.as_ref(),
- &node.inputs(),
- &physical_inputs,
- ctx_state,
- )
- }
- },
- )?;
- let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
- "No installed planner was able to convert the custom node to an execution plan: {:?}", node
- )))?;
-
- // Ensure the ExecutionPlan's schema matches the
- // declared logical schema to catch and warn about
- // logic errors when creating user defined plans.
- if !node.schema().matches_arrow_schema(&plan.schema()) {
- Err(DataFusionError::Plan(format!(
- "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
- LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
- node, node.schema(), plan.schema()
- )))
- } else {
- Ok(plan)
+ Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
- }
- }
+ LogicalPlan::CreateExternalTable { .. } => {
+ // There is no default plan for "CREATE EXTERNAL
+ // TABLE" -- it must be handled at a higher level (so
+ // that the appropriate table can be registered with
+ // the context)
+ Err(DataFusionError::Internal(
+ "Unsupported logical plan: CreateExternalTable".to_string(),
+ ))
+ }
+ LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
+ "Unsupported logical plan: Explain must be root of the plan".to_string(),
+ )),
+ LogicalPlan::Analyze {
+ verbose,
+ input,
+ schema,
+ } => {
+ let input = self.create_initial_plan(input, ctx_state).await?;
+ let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+ Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
+ }
+ LogicalPlan::Extension { node } => {
+ let physical_inputs = futures::stream::iter(node.inputs())
+ .then(|lp| self.create_initial_plan(lp, ctx_state))
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ let maybe_plan = self.extension_planners.iter().try_fold(
+ None,
+ |maybe_plan, planner| {
+ if let Some(plan) = maybe_plan {
+ Ok(Some(plan))
+ } else {
+ planner.plan_extension(
+ self,
+ node.as_ref(),
+ &node.inputs(),
+ &physical_inputs,
+ ctx_state,
+ )
+ }
+ },
+ )?;
+ let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
+ "No installed planner was able to convert the custom node to an execution plan: {:?}", node
+ )))?;
+
+ // Ensure the ExecutionPlan's schema matches the
+ // declared logical schema to catch and warn about
+ // logic errors when creating user defined plans.
+ if !node.schema().matches_arrow_schema(&plan.schema()) {
+ Err(DataFusionError::Plan(format!(
+ "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
+ LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
+ node, node.schema(), plan.schema()
+ )))
+ } else {
+ Ok(plan)
+ }
+ }
+ };
+ exec_plan
+ }.boxed()
}
/// Create a physical expression from a logical expression
@@ -1315,7 +1320,7 @@ impl DefaultPhysicalPlanner {
/// Returns
/// Some(plan) if optimized, and None if logical_plan was not an
/// explain (and thus needs to be optimized as normal)
- fn handle_explain(
+ async fn handle_explain(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
@@ -1332,7 +1337,7 @@ impl DefaultPhysicalPlanner {
stringified_plans.push(plan.to_stringified(FinalLogicalPlan));
- let input = self.create_initial_plan(plan, ctx_state)?;
+ let input = self.create_initial_plan(plan, ctx_state).await?;
stringified_plans
.push(displayable(input.as_ref()).to_stringified(InitialPhysicalPlan));
@@ -1411,15 +1416,15 @@ mod tests {
ExecutionContextState::new()
}
- fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
+ async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx_state = make_ctx_state();
ctx_state.config.target_partitions = 4;
let planner = DefaultPhysicalPlanner::default();
- planner.create_physical_plan(logical_plan, &ctx_state)
+ planner.create_physical_plan(logical_plan, &ctx_state).await
}
- #[test]
- fn test_all_operators() -> Result<()> {
+ #[tokio::test]
+ async fn test_all_operators() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
@@ -1433,7 +1438,7 @@ mod tests {
.limit(10)?
.build()?;
- let plan = plan(&logical_plan)?;
+ let plan = plan(&logical_plan).await?;
// verify that the plan correctly casts u8 to i64
// the cast here is implicit so has CastOptions with safe=true
@@ -1443,8 +1448,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_create_not() -> Result<()> {
+ #[tokio::test]
+ async fn test_create_not() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
let dfschema = DFSchema::try_from(schema.clone())?;
@@ -1463,8 +1468,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_with_csv_plan() -> Result<()> {
+ #[tokio::test]
+ async fn test_with_csv_plan() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
@@ -1473,7 +1478,7 @@ mod tests {
.filter(col("c7").lt(col("c12")))?
.build()?;
- let plan = plan(&logical_plan)?;
+ let plan = plan(&logical_plan).await?;
// c12 is f64, c7 is u8 -> cast c7 to f64
// the cast here is implicit so has CastOptions with safe=true
@@ -1482,8 +1487,8 @@ mod tests {
Ok(())
}
- #[test]
- fn errors() -> Result<()> {
+ #[tokio::test]
+ async fn errors() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
@@ -1517,14 +1522,16 @@ mod tests {
Ok(())
}
- #[test]
- fn default_extension_planner() {
+ #[tokio::test]
+ async fn default_extension_planner() {
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::default();
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
- let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
+ let plan = planner
+ .create_physical_plan(&logical_plan, &ctx_state)
+ .await;
let expected_error =
"No installed planner was able to convert the custom node to an execution plan: NoOp";
@@ -1539,8 +1546,8 @@ mod tests {
}
}
- #[test]
- fn bad_extension_planner() {
+ #[tokio::test]
+ async fn bad_extension_planner() {
// Test that creating an execution plan whose schema doesn't
// match the logical plan's schema generates an error.
let ctx_state = make_ctx_state();
@@ -1551,7 +1558,9 @@ mod tests {
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
- let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
+ let plan = planner
+ .create_physical_plan(&logical_plan, &ctx_state)
+ .await;
let expected_error: &str = "Error during planning: \
Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
@@ -1584,8 +1593,8 @@ mod tests {
}
}
- #[test]
- fn in_list_types() -> Result<()> {
+ #[tokio::test]
+ async fn in_list_types() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
@@ -1600,7 +1609,7 @@ mod tests {
.filter(col("c12").lt(lit(0.05)))?
.project(vec![col("c1").in_list(list, false)])?
.build()?;
- let execution_plan = plan(&logical_plan)?;
+ let execution_plan = plan(&logical_plan).await?;
// verify that the plan correctly adds cast from Int64(1) to Utf8
let expected = "InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }], negated: false }";
assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1615,7 +1624,7 @@ mod tests {
.filter(col("c12").lt(lit(0.05)))?
.project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
.build()?;
- let execution_plan = plan(&logical_plan);
+ let execution_plan = plan(&logical_plan).await;
let expected_error = "Unsupported CAST from Utf8 to Boolean";
match execution_plan {
@@ -1631,8 +1640,8 @@ mod tests {
Ok(())
}
- #[test]
- fn hash_agg_input_schema() -> Result<()> {
+ #[tokio::test]
+ async fn hash_agg_input_schema() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
@@ -1646,7 +1655,7 @@ mod tests {
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
- let execution_plan = plan(&logical_plan)?;
+ let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<HashAggregateExec>()
@@ -1662,8 +1671,8 @@ mod tests {
Ok(())
}
- #[test]
- fn hash_agg_group_by_partitioned() -> Result<()> {
+ #[tokio::test]
+ async fn hash_agg_group_by_partitioned() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
@@ -1672,7 +1681,7 @@ mod tests {
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
- let execution_plan = plan(&logical_plan)?;
+ let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{:?}", execution_plan);
// Make sure the plan contains a FinalPartitioned, which means it will not use the Final
@@ -1682,8 +1691,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_explain() {
+ #[tokio::test]
+ async fn test_explain() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let logical_plan =
@@ -1694,7 +1703,7 @@ mod tests {
.build()
.unwrap();
- let plan = plan(&logical_plan).unwrap();
+ let plan = plan(&logical_plan).await.unwrap();
if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
let stringified_plans = plan.stringified_plans();
assert!(stringified_plans.len() >= 4);
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index 31551a9..a6456f1 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -182,6 +182,7 @@ impl ExecutionPlan for CustomExecutionPlan {
}
}
+#[async_trait]
impl TableProvider for CustomTableProvider {
fn as_any(&self) -> &dyn Any {
self
@@ -191,7 +192,7 @@ impl TableProvider for CustomTableProvider {
TEST_CUSTOM_SCHEMA_REF!()
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
@@ -236,7 +237,7 @@ async fn custom_source_dataframe() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);
- let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+ let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -260,7 +261,10 @@ async fn optimizers_catch_all_statistics() {
.sql("SELECT count(*), min(c1), max(c1) from test")
.unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index 14f5dd2..511a9e6 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -533,6 +533,7 @@ impl ContextWithParquet {
let physical_plan = self
.ctx
.create_physical_plan(&logical_plan)
+ .await
.expect("creating physical plan");
let results = datafusion::physical_plan::collect(physical_plan.clone())
diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs
index e0102c4..653b96c 100644
--- a/datafusion/tests/provider_filter_pushdown.rs
+++ b/datafusion/tests/provider_filter_pushdown.rs
@@ -110,6 +110,7 @@ struct CustomProvider {
one_batch: RecordBatch,
}
+#[async_trait]
impl TableProvider for CustomProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -119,7 +120,7 @@ impl TableProvider for CustomProvider {
self.zero_batch.schema()
}
- fn scan(
+ async fn scan(
&self,
_: &Option<Vec<usize>>,
_: usize,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4cd0ed3..a67c82b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -188,7 +188,7 @@ async fn parquet_single_nan_schema() {
let sql = "SELECT mycol FROM single_nan";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(plan).await.unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
@@ -223,7 +223,7 @@ async fn parquet_list_columns() {
let sql = "SELECT int64_list, utf8_list FROM list_columns";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(plan).await.unwrap();
// int64_list utf8_list
@@ -928,7 +928,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> {
let sql = "SELECT avg(c12) FROM aggregate_test_100";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(plan).await.unwrap();
let batch = &results[0];
let column = batch.column(0);
@@ -2366,7 +2366,7 @@ async fn explain_analyze_baseline_metrics() {
println!("running query: {}", sql);
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+ let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(physical_plan.clone()).await.unwrap();
let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap();
println!("Query Output:\n\n{}", formatted);
@@ -2648,7 +2648,7 @@ async fn csv_explain_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
- let plan = ctx.create_physical_plan(&plan).expect(&msg);
+ let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
@@ -2845,7 +2845,7 @@ async fn csv_explain_verbose_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
- let plan = ctx.create_physical_plan(&plan).expect(&msg);
+ let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
@@ -3002,7 +3002,7 @@ async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec<Record
let optimized_logical_schema = plan.schema();
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
- let plan = ctx.create_physical_plan(&plan).expect(&msg);
+ let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let results = collect(plan).await.expect(&msg);
@@ -4401,7 +4401,7 @@ async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
let plan = ctx.create_logical_plan(sql).expect(&msg);
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
- let plan = ctx.create_physical_plan(&plan).expect(&msg);
+ let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let res = collect(plan).await.expect(&msg);
@@ -4439,7 +4439,7 @@ async fn test_cast_expressions_error() -> Result<()> {
let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
let result = collect(plan).await;
match result {
@@ -4469,7 +4469,7 @@ async fn test_physical_plan_display_indent() {
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+ let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let expected = vec![
"GlobalLimitExec: limit=10",
" SortExec: [the_min@2 DESC]",
@@ -4517,7 +4517,7 @@ async fn test_physical_plan_display_indent_multi_children() {
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+ let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let expected = vec![
"ProjectionExec: expr=[c1@0 as c1]",
" CoalesceBatchesExec: target_batch_size=4096",
@@ -4555,7 +4555,7 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
register_aggregate_csv(&mut ctx)?;
let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100";
let logical_plan = ctx.create_logical_plan(sql)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan);
+ let physical_plan = ctx.create_physical_plan(&logical_plan).await;
let err = physical_plan.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
Ok(())
@@ -4875,7 +4875,7 @@ async fn avro_single_nan_schema() {
let sql = "SELECT mycol FROM single_nan";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(plan).await.unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs
index a2375ad..7a19aa7 100644
--- a/datafusion/tests/statistics.rs
+++ b/datafusion/tests/statistics.rs
@@ -59,6 +59,7 @@ impl StatisticsValidation {
}
}
+#[async_trait]
impl TableProvider for StatisticsValidation {
fn as_any(&self) -> &dyn Any {
self
@@ -68,7 +69,7 @@ impl TableProvider for StatisticsValidation {
Arc::clone(&self.schema)
}
- fn scan(
+ async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
@@ -212,7 +213,10 @@ async fn sql_basic() -> Result<()> {
let df = ctx.sql("SELECT * from stats_table").unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
// the statistics should be those of the source
assert_eq!(stats, physical_plan.statistics());
@@ -227,7 +231,10 @@ async fn sql_filter() -> Result<()> {
let df = ctx.sql("SELECT * FROM stats_table WHERE c1 = 5").unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
// with a filtering condition we loose all knowledge about the statistics
assert_eq!(Statistics::default(), physical_plan.statistics());
@@ -241,7 +248,10 @@ async fn sql_limit() -> Result<()> {
let mut ctx = init_ctx(stats.clone(), schema)?;
let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
// when the limit is smaller than the original number of lines
// we loose all statistics except the for number of rows which becomes the limit
assert_eq!(
@@ -254,7 +264,10 @@ async fn sql_limit() -> Result<()> {
);
let df = ctx.sql("SELECT * FROM stats_table LIMIT 100").unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
// when the limit is larger than the original number of lines, statistics remain unchanged
assert_eq!(stats, physical_plan.statistics());
@@ -270,7 +283,10 @@ async fn sql_window() -> Result<()> {
.sql("SELECT c2, sum(c1) over (partition by c2) FROM stats_table")
.unwrap();
- let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+ let physical_plan = ctx
+ .create_physical_plan(&df.to_logical_plan())
+ .await
+ .unwrap();
let result = physical_plan.statistics();
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index d57a24c..14600f2 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -202,10 +202,11 @@ fn make_topk_context() -> ExecutionContext {
struct TopKQueryPlanner {}
+#[async_trait]
impl QueryPlanner for TopKQueryPlanner {
/// Given a `LogicalPlan` created from above, create an
/// `ExecutionPlan` suitable for execution
- fn create_physical_plan(
+ async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
@@ -216,7 +217,9 @@ impl QueryPlanner for TopKQueryPlanner {
TopKPlanner {},
)]);
// Delegate most work of physical planning to the default physical planner
- physical_planner.create_physical_plan(logical_plan, ctx_state)
+ physical_planner
+ .create_physical_plan(logical_plan, ctx_state)
+ .await
}
}
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index 0885ae3..48da234 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -122,14 +122,19 @@ impl DataFrame {
/// Unless some order is specified in the plan, there is no guarantee of the order of the result
fn collect(&self, py: Python) -> PyResult<PyObject> {
let ctx = _ExecutionContext::from(self.ctx_state.clone());
+ let rt = Runtime::new().unwrap();
let plan = ctx
.optimize(&self.plan)
.map_err(|e| -> errors::DataFusionError { e.into() })?;
- let plan = ctx
- .create_physical_plan(&plan)
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
- let rt = Runtime::new().unwrap();
+ let plan = py.allow_threads(|| {
+ rt.block_on(async {
+ ctx.create_physical_plan(&plan)
+ .await
+ .map_err(|e| -> errors::DataFusionError { e.into() })
+ })
+ })?;
+
let batches = py.allow_threads(|| {
rt.block_on(async {
collect(plan)
@@ -144,12 +149,20 @@ impl DataFrame {
#[args(num = "20")]
fn show(&self, py: Python, num: usize) -> PyResult<()> {
let ctx = _ExecutionContext::from(self.ctx_state.clone());
- let plan = ctx
- .optimize(&self.limit(num)?.plan)
- .and_then(|plan| ctx.create_physical_plan(&plan))
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
-
let rt = Runtime::new().unwrap();
+ let plan = py.allow_threads(|| {
+ rt.block_on(async {
+ let l_plan = ctx
+ .optimize(&self.limit(num)?.plan)
+ .map_err(|e| -> errors::DataFusionError { e.into() })?;
+ let p_plan = ctx
+ .create_physical_plan(&l_plan)
+ .await
+ .map_err(|e| -> errors::DataFusionError { e.into() })?;
+ Ok::<_, PyErr>(p_plan)
+ })
+ })?;
+
let batches = py.allow_threads(|| {
rt.block_on(async {
collect(plan)