You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/30 18:57:48 UTC
[arrow-datafusion] branch main updated: Remove Rayon-based Scheduler (#6169)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 71efcf5ee8 Remove Rayon-based Scheduler (#6169)
71efcf5ee8 is described below
commit 71efcf5ee8900a1efe12fb812e210e6941060733
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Apr 30 19:57:41 2023 +0100
Remove Rayon-based Scheduler (#6169)
* Remove scheduler
* Fix tpch
* Format
---
.github/workflows/docs_pr.yaml | 2 +-
.github/workflows/rust.yml | 11 +-
benchmarks/Cargo.toml | 2 +-
benchmarks/src/bin/tpch.rs | 34 +-
ci/scripts/rust_clippy.sh | 2 +-
datafusion/core/Cargo.toml | 4 -
datafusion/core/benches/parquet_query_sql.rs | 19 -
datafusion/core/src/lib.rs | 2 -
datafusion/core/src/scheduler/mod.rs | 460 -------------------
.../core/src/scheduler/pipeline/execution.rs | 307 -------------
datafusion/core/src/scheduler/pipeline/mod.rs | 111 -----
.../core/src/scheduler/pipeline/repartition.rs | 155 -------
datafusion/core/src/scheduler/plan.rs | 296 ------------
datafusion/core/src/scheduler/task.rs | 509 ---------------------
14 files changed, 15 insertions(+), 1899 deletions(-)
diff --git a/.github/workflows/docs_pr.yaml b/.github/workflows/docs_pr.yaml
index 29908abba0..821321c8c5 100644
--- a/.github/workflows/docs_pr.yaml
+++ b/.github/workflows/docs_pr.yaml
@@ -49,6 +49,6 @@ jobs:
rust-version: stable
# Note: this does not include dictionary_expressions to reduce codegen
- name: Run doctests
- run: cargo test --doc --features avro,scheduler,json
+ run: cargo test --doc --features avro,json
- name: Verify Working Directory Clean
run: git diff --exit-code
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e53d66945d..654c0d5649 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -67,8 +67,7 @@ jobs:
# Note: this does not include dictionary_expressions to reduce codegen
- name: Check workspace with all features
- run: cargo check --workspace --benches --features avro,scheduler,json
-
+ run: cargo check --workspace --benches --features avro,json
- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
@@ -97,7 +96,7 @@ jobs:
with:
rust-version: stable
- name: Run tests (excluding doctests)
- run: cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+ run: cargo test --lib --tests --bins --features avro,json,dictionary_expressions
- name: Verify Working Directory Clean
run: git diff --exit-code
@@ -153,7 +152,7 @@ jobs:
rust-version: stable
# Note: this does not include dictionary_expressions to reduce codegen
- name: Run doctests
- run: cargo test --doc --features avro,scheduler,json
+ run: cargo test --doc --features avro,json
- name: Verify Working Directory Clean
run: git diff --exit-code
@@ -272,7 +271,7 @@ jobs:
shell: bash
run: |
export PATH=$PATH:$HOME/d/protoc/bin
- cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+ cargo test --lib --tests --bins --features avro,json,dictionary_expressions
env:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"
@@ -305,7 +304,7 @@ jobs:
- name: Run tests (excluding doctests)
shell: bash
run: |
- cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+ cargo test --lib --tests --bins --features avro,json,dictionary_expressions
env:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index ce0545ba00..91effa3707 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,7 +34,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
arrow = { workspace = true }
-datafusion = { path = "../datafusion/core", version = "23.0.0", features = ["scheduler"] }
+datafusion = { path = "../datafusion/core", version = "23.0.0" }
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 36da520cb7..43659a6f9f 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -37,8 +37,6 @@ use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::datasource::listing::ListingTableUrl;
-use datafusion::scheduler::Scheduler;
-use futures::TryStreamExt;
use structopt::StructOpt;
#[cfg(feature = "snmalloc")]
@@ -90,10 +88,6 @@ struct DataFusionBenchmarkOpt {
/// Whether to disable collection of statistics (and cost based optimizations) or not.
#[structopt(short = "S", long = "disable-statistics")]
disable_statistics: bool,
-
- /// Enable scheduler
- #[structopt(short = "e", long = "enable-scheduler")]
- enable_scheduler: bool,
}
#[derive(Debug, StructOpt)]
@@ -227,16 +221,14 @@ async fn benchmark_query(
if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
- result = execute_query(&ctx, query, opt.debug, opt.enable_scheduler)
- .await?;
+ result = execute_query(&ctx, query, opt.debug).await?;
} else {
- execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
+ execute_query(&ctx, query, opt.debug).await?;
}
}
} else {
for query in sql {
- result =
- execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
+ result = execute_query(&ctx, query, opt.debug).await?;
}
}
@@ -295,7 +287,6 @@ async fn execute_query(
ctx: &SessionContext,
sql: &str,
debug: bool,
- enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
@@ -315,15 +306,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
- let result = if enable_scheduler {
- let scheduler = Scheduler::new(num_cpus::get());
- let results = scheduler
- .schedule(physical_plan.clone(), state.task_ctx())
- .unwrap();
- results.stream().try_collect().await?
- } else {
- collect(physical_plan.clone(), state.task_ctx()).await?
- };
+ let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
@@ -529,8 +512,7 @@ mod tests {
// handle special q15 which contains "create view" sql statement
if sql.starts_with("select") {
let explain = "explain ".to_string() + sql;
- let result_batch =
- execute_query(&ctx, explain.as_str(), false, false).await?;
+ let result_batch = execute_query(&ctx, explain.as_str(), false).await?;
if !actual.is_empty() {
actual += "\n";
}
@@ -542,7 +524,7 @@ mod tests {
// let mut file = File::create(format!("expected-plans/q{}.txt", query))?;
// file.write_all(actual.as_bytes())?;
} else {
- execute_query(&ctx, sql.as_str(), false, false).await?;
+ execute_query(&ctx, sql.as_str(), false).await?;
}
}
@@ -726,7 +708,7 @@ mod tests {
let sql = &get_query_sql(n)?;
for query in sql {
- execute_query(&ctx, query, false, false).await?;
+ execute_query(&ctx, query, false).await?;
}
Ok(())
@@ -757,7 +739,6 @@ mod ci {
mem_table: false,
output_path: None,
disable_statistics: false,
- enable_scheduler: false,
};
register_tables(&opt, &ctx).await?;
let queries = get_query_sql(query)?;
@@ -1064,7 +1045,6 @@ mod ci {
mem_table: false,
output_path: None,
disable_statistics: false,
- enable_scheduler: false,
};
let mut results = benchmark_datafusion(opt).await?;
assert_eq!(results.len(), 1);
diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh
index d22a098905..dfd2916981 100755
--- a/ci/scripts/rust_clippy.sh
+++ b/ci/scripts/rust_clippy.sh
@@ -18,4 +18,4 @@
# under the License.
set -ex
-cargo clippy --all-targets --workspace --features avro,pyarrow,scheduler -- -D warnings
+cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 1fe9947a20..023848641f 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -46,8 +46,6 @@ dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "da
force_hash_collisions = []
pyarrow = ["datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
-# Used to enable scheduler
-scheduler = ["rayon"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]
@@ -86,7 +84,6 @@ parquet = { workspace = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
-rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { version = "0.33", features = ["visitor"] }
tempfile = "3"
@@ -150,7 +147,6 @@ name = "physical_plan"
[[bench]]
harness = false
name = "parquet_query_sql"
-required-features = ["scheduler"]
[[bench]]
harness = false
diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs
index 9b0d809629..876b1fe7e1 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -25,7 +25,6 @@ use arrow::datatypes::{
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion::scheduler::Scheduler;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
@@ -196,8 +195,6 @@ fn criterion_benchmark(c: &mut Criterion) {
let config = SessionConfig::new().with_target_partitions(partitions);
let context = SessionContext::with_config(config);
- let scheduler = Scheduler::new(partitions);
-
let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
@@ -249,22 +246,6 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
});
-
- c.bench_function(&format!("scheduled: {query}"), |b| {
- b.iter(|| {
- let query = query.clone();
- let context = context.clone();
-
- local_rt.block_on(async {
- let query = context.sql(&query).await.unwrap();
- let plan = query.create_physical_plan().await.unwrap();
- let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
-
- let mut stream = results.stream();
- while stream.next().await.transpose().unwrap().is_some() {}
- });
- });
- });
}
// Temporary file must outlive the benchmarks, it is deleted when dropped
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index ddc63c157f..4082a22daa 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -422,8 +422,6 @@ pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
-#[cfg(feature = "scheduler")]
-pub mod scheduler;
pub mod variable;
// re-export dependencies from arrow-rs to minimise version maintenance for crate users
diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs
deleted file mode 100644
index 790c990467..0000000000
--- a/datafusion/core/src/scheduler/mod.rs
+++ /dev/null
@@ -1,460 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
-//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
-//! and is designed to decouple the execution parallelism from the parallelism expressed in
-//! the physical plan as partitions.
-//!
-//! # Implementation
-//!
-//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
-//! chunks called pipelines. Each pipeline may consist of one or more nodes from the
-//! [`ExecutionPlan`] tree.
-//!
-//! The scheduler then maintains a list of pending `Task`s, that identify a partition within
-//! a particular pipeline that may be able to make progress on some "morsel" of data. These
-//! `Task`s are then scheduled on the worker pool, with a preference for scheduling work
-//! on a given "morsel" on the same thread that produced it.
-//!
-//! # Rayon
-//!
-//! Under-the-hood these `Task`s are scheduled by [rayon], which is a lightweight, work-stealing
-//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
-//! structured concurrency primitives to express additional parallelism that may be exploited
-//! if there are idle threads available at runtime
-//!
-//! # Shutdown
-//!
-//! Queries scheduled on a [`Scheduler`] will run to completion even if the
-//! [`Scheduler`] is dropped
-//!
-//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
-//! [rayon]: https://docs.rs/rayon/latest/rayon/
-//!
-//! # Example
-//!
-//! ```rust
-//! # use futures::TryStreamExt;
-//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
-//! # use datafusion::scheduler::Scheduler;
-//!
-//! # #[tokio::main]
-//! # async fn main() {
-//! let scheduler = Scheduler::new(4);
-//! let config = SessionConfig::new().with_target_partitions(4);
-//! let context = SessionContext::with_config(config);
-//!
-//! context.register_csv("example", "../core/tests/data/example.csv", CsvReadOptions::new()).await.unwrap();
-//! let plan = context.sql("SELECT MIN(b) FROM example")
-//! .await
-//! .unwrap()
-//! .create_physical_plan()
-//! .await
-//! .unwrap();
-//!
-//! let task = context.task_ctx();
-//! let results = scheduler.schedule(plan, task).unwrap();
-//! let scheduled: Vec<_> = results.stream().try_collect().await.unwrap();
-//! # }
-//! ```
-//!
-
-use std::sync::Arc;
-
-use log::{debug, error};
-
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::ExecutionPlan;
-
-use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
-use task::{spawn_plan, Task};
-
-use rayon::{ThreadPool, ThreadPoolBuilder};
-
-pub use task::ExecutionResults;
-
-mod pipeline;
-mod plan;
-mod task;
-
-/// Builder for a [`Scheduler`]
-#[derive(Debug)]
-pub struct SchedulerBuilder {
- inner: ThreadPoolBuilder,
-}
-
-impl SchedulerBuilder {
- /// Create a new [`SchedulerBuilder`] with the provided number of threads
- pub fn new(num_threads: usize) -> Self {
- let builder = ThreadPoolBuilder::new()
- .num_threads(num_threads)
- .panic_handler(|p| error!("{}", format_worker_panic(p)))
- .thread_name(|idx| format!("df-worker-{idx}"));
-
- Self { inner: builder }
- }
-
- /// Registers a custom panic handler
- #[cfg(test)]
- fn panic_handler<H>(self, panic_handler: H) -> Self
- where
- H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
- {
- Self {
- inner: self.inner.panic_handler(panic_handler),
- }
- }
-
- /// Build a new [`Scheduler`]
- fn build(self) -> Scheduler {
- Scheduler {
- pool: Arc::new(self.inner.build().unwrap()),
- }
- }
-}
-
-/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool
-pub struct Scheduler {
- pool: Arc<ThreadPool>,
-}
-
-impl Scheduler {
- /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool
- pub fn new(num_threads: usize) -> Self {
- SchedulerBuilder::new(num_threads).build()
- }
-
- /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
- ///
- /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced,
- /// as a [`futures::Stream`] of [`RecordBatch`]
- ///
- /// [`RecordBatch`]: arrow::record_batch::RecordBatch
- pub fn schedule(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- context: Arc<TaskContext>,
- ) -> Result<ExecutionResults> {
- let plan = PipelinePlanner::new(plan, context).build()?;
- Ok(self.schedule_plan(plan))
- }
-
- /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
- pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults {
- spawn_plan(plan, self.spawner())
- }
-
- fn spawner(&self) -> Spawner {
- Spawner {
- pool: self.pool.clone(),
- }
- }
-}
-
-/// Formats a panic message for a worker
-fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
- let maybe_idx = rayon::current_thread_index();
- let worker: &dyn std::fmt::Display = match &maybe_idx {
- Some(idx) => idx,
- None => &"UNKNOWN",
- };
-
- let message = if let Some(msg) = panic.downcast_ref::<&str>() {
- *msg
- } else if let Some(msg) = panic.downcast_ref::<String>() {
- msg.as_str()
- } else {
- "UNKNOWN"
- };
-
- format!("worker {worker} panicked with: {message}")
-}
-
-/// Returns `true` if the current thread is a rayon worker thread
-///
-/// Note: if there are multiple rayon pools, this will return `true` if the current thread
-/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance
-fn is_worker() -> bool {
- rayon::current_thread_index().is_some()
-}
-
-/// Spawn a [`Task`] onto the local workers thread pool
-///
-/// There is no guaranteed order of execution, as workers may steal at any time. However,
-/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from
-/// the front of their queue, and steal tasks from the back of other workers queues
-///
-/// The effect is that tasks spawned using `spawn_local` will typically be prioritised in
-/// a LIFO order, however, this should not be relied upon
-fn spawn_local(task: Task) {
- // Verify is a worker thread to avoid creating a global pool
- assert!(is_worker(), "must be called from a worker");
- rayon::spawn(|| task.do_work())
-}
-
-/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
-///
-/// There is no guaranteed order of execution, as workers may steal at any time. However,
-/// `spawn_local_fifo` will append to the back of the current worker's queue, workers pop tasks
-/// from the front of their queue, and steal tasks from the back of other workers queues
-///
-/// The effect is that tasks spawned using `spawn_local_fifo` will typically be prioritised
-/// in a FIFO order, however, this should not be relied upon
-fn spawn_local_fifo(task: Task) {
- // Verify is a worker thread to avoid creating a global pool
- assert!(is_worker(), "must be called from a worker");
- rayon::spawn_fifo(|| task.do_work())
-}
-
-#[derive(Debug, Clone)]
-pub(crate) struct Spawner {
- pool: Arc<ThreadPool>,
-}
-
-impl Spawner {
- fn spawn(&self, task: Task) {
- debug!("Spawning {:?} to any worker", task);
- self.pool.spawn(move || task.do_work());
- }
-}
-
-#[cfg(test)]
-mod tests {
- use arrow::util::pretty::pretty_format_batches;
- use std::ops::Range;
- use std::panic::panic_any;
-
- use futures::{StreamExt, TryStreamExt};
- use log::info;
- use rand::distributions::uniform::SampleUniform;
- use rand::{thread_rng, Rng};
-
- use crate::arrow::array::{ArrayRef, PrimitiveArray};
- use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
- use crate::arrow::record_batch::RecordBatch;
- use crate::datasource::{MemTable, TableProvider};
- use crate::physical_plan::displayable;
- use crate::prelude::{SessionConfig, SessionContext};
-
- use super::*;
-
- fn generate_primitive<T, R>(
- rng: &mut R,
- len: usize,
- valid_percent: f64,
- range: Range<T::Native>,
- ) -> ArrayRef
- where
- T: ArrowPrimitiveType,
- T::Native: SampleUniform,
- R: Rng,
- {
- Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
- rng.gen_bool(valid_percent)
- .then(|| rng.gen_range(range.clone()))
- })))
- }
-
- fn generate_batch<R: Rng>(
- rng: &mut R,
- row_count: usize,
- id_offset: i32,
- ) -> RecordBatch {
- let id_range = id_offset..(row_count as i32 + id_offset);
- let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 0..1000);
- let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. ..1000.);
- let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
-
- RecordBatch::try_from_iter_with_nullable([
- ("a", a, true),
- ("b", b, true),
- ("id", Arc::new(id), false),
- ])
- .unwrap()
- }
-
- const BATCHES_PER_PARTITION: usize = 20;
- const ROWS_PER_BATCH: usize = 100;
- const NUM_PARTITIONS: usize = 2;
-
- fn make_batches() -> Vec<Vec<RecordBatch>> {
- let mut rng = thread_rng();
-
- let mut id_offset = 0;
-
- (0..NUM_PARTITIONS)
- .map(|_| {
- (0..BATCHES_PER_PARTITION)
- .map(|_| {
- let batch = generate_batch(&mut rng, ROWS_PER_BATCH, id_offset);
- id_offset += ROWS_PER_BATCH as i32;
- batch
- })
- .collect()
- })
- .collect()
- }
-
- fn make_provider() -> Arc<dyn TableProvider> {
- let batches = make_batches();
- let schema = batches.first().unwrap().first().unwrap().schema();
- Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
- }
-
- fn init_logging() {
- let _ = env_logger::builder().is_test(true).try_init();
- }
-
- #[tokio::test]
- async fn test_simple() {
- init_logging();
-
- let scheduler = SchedulerBuilder::new(4)
- .panic_handler(|panic| {
- unreachable!("not expect panic: {:?}", panic);
- })
- .build();
-
- let config = SessionConfig::new().with_target_partitions(4);
- let context = SessionContext::with_config(config);
-
- context.register_table("table1", make_provider()).unwrap();
- context.register_table("table2", make_provider()).unwrap();
-
- let queries = [
- "select * from table1 order by id",
- "select * from table1 where table1.a > 100 order by id",
- "select distinct a from table1 where table1.b > 100 order by a",
- "select * from table1 join table2 on table1.id = table2.id order by table1.id",
- "select id from table1 union all select id from table2 order by id",
- "select id from table1 union all select id from table2 where a > 100 order by id",
- "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
- "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id",
- "select count(*) from table1 where table1.a > 4",
- "WITH gp AS (SELECT id FROM table1 GROUP BY id)
- SELECT COUNT(CAST(CAST(gp.id || 'xx' AS TIMESTAMP) AS BIGINT)) FROM gp",
- ];
-
- for sql in queries {
- let task = context.task_ctx();
-
- let query = context.sql(sql).await.unwrap();
-
- let plan = query.clone().create_physical_plan().await.unwrap();
-
- info!("Plan: {}", displayable(plan.as_ref()).indent());
-
- let stream = scheduler.schedule(plan, task).unwrap().stream();
- let scheduled: Vec<_> = stream.try_collect().await.unwrap_or_default();
- let expected = query.collect().await.unwrap_or_default();
-
- let total_expected = expected.iter().map(|x| x.num_rows()).sum::<usize>();
- let total_scheduled = scheduled.iter().map(|x| x.num_rows()).sum::<usize>();
- assert_eq!(total_expected, total_scheduled);
-
- info!("Query \"{}\" produced {} rows", sql, total_expected);
-
- let expected = pretty_format_batches(&expected).unwrap().to_string();
- let scheduled = pretty_format_batches(&scheduled).unwrap().to_string();
-
- assert_eq!(
- expected, scheduled,
- "\n\nexpected:\n\n{expected}\nactual:\n\n{scheduled}\n\n"
- );
- }
- }
-
- #[tokio::test]
- async fn test_partitioned() {
- init_logging();
-
- let scheduler = Scheduler::new(4);
-
- let config = SessionConfig::new().with_target_partitions(4);
- let context = SessionContext::with_config(config);
- let plan = context
- .read_table(make_provider())
- .unwrap()
- .create_physical_plan()
- .await
- .unwrap();
-
- assert_eq!(plan.output_partitioning().partition_count(), NUM_PARTITIONS);
-
- let results = scheduler
- .schedule(plan.clone(), context.task_ctx())
- .unwrap();
-
- let batches = results.stream().try_collect::<Vec<_>>().await.unwrap();
- assert_eq!(batches.len(), NUM_PARTITIONS * BATCHES_PER_PARTITION);
-
- for batch in batches {
- assert_eq!(batch.num_rows(), ROWS_PER_BATCH)
- }
-
- let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
- let streams = results.stream_partitioned();
-
- let partitions: Vec<Vec<_>> =
- futures::future::try_join_all(streams.into_iter().map(|s| s.try_collect()))
- .await
- .unwrap();
-
- assert_eq!(partitions.len(), NUM_PARTITIONS);
- for batches in partitions {
- assert_eq!(batches.len(), BATCHES_PER_PARTITION);
- for batch in batches {
- assert_eq!(batch.num_rows(), ROWS_PER_BATCH);
- }
- }
- }
-
- #[tokio::test]
- async fn test_panic() {
- init_logging();
-
- let do_test = |scheduler: Scheduler| {
- scheduler.pool.spawn(|| panic!("test"));
- scheduler.pool.spawn(|| panic!("{}", 1));
- scheduler.pool.spawn(|| panic_any(21));
- };
-
- // The default panic handler should log panics and not abort the process
- do_test(Scheduler::new(1));
-
- // Override panic handler and capture panics to test formatting
- let (sender, receiver) = futures::channel::mpsc::unbounded();
- let scheduler = SchedulerBuilder::new(1)
- .panic_handler(move |panic| {
- let _ = sender.unbounded_send(format_worker_panic(panic));
- })
- .build();
-
- do_test(scheduler);
-
- // Sort as order not guaranteed
- let mut buffer: Vec<_> = receiver.collect().await;
- buffer.sort_unstable();
-
- assert_eq!(buffer.len(), 3);
- assert_eq!(buffer[0], "worker 0 panicked with: 1");
- assert_eq!(buffer[1], "worker 0 panicked with: UNKNOWN");
- assert_eq!(buffer[2], "worker 0 panicked with: test");
- }
-}
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
deleted file mode 100644
index ea1643867b..0000000000
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ /dev/null
@@ -1,307 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-use std::collections::VecDeque;
-use std::fmt::Formatter;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll, Waker};
-
-use futures::{Stream, StreamExt};
-use parking_lot::Mutex;
-
-use crate::arrow::datatypes::SchemaRef;
-use crate::arrow::record_batch::RecordBatch;
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::metrics::MetricsSet;
-use crate::physical_plan::{
- displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
-};
-
-use crate::scheduler::pipeline::Pipeline;
-
-/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
-/// converts it to the push-based [`Pipeline`] interface
-///
-/// Internally [`ExecutionPipeline`] is still pull-based which limits its parallelism
-/// to that of its output partitioning, however, it provides full compatibility with
-/// [`ExecutionPlan`] allowing full interoperability with the existing ecosystem
-///
-/// Longer term we will likely want to introduce new traits that differentiate between
-/// pipeline-able operators like filters, and pipeline-breakers like aggregations, and
-/// are better aligned with a push-based execution model.
-///
-/// This in turn will allow for [`Pipeline`] implementations that are able to introduce
-/// parallelism beyond that expressed in their partitioning
-pub struct ExecutionPipeline {
- proxied: Arc<dyn ExecutionPlan>,
- inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
- outputs: Vec<Mutex<SendableRecordBatchStream>>,
-}
-
-impl std::fmt::Debug for ExecutionPipeline {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let tree = debug_tree(self.proxied.as_ref());
- f.debug_tuple("ExecutionNode").field(&tree).finish()
- }
-}
-
-impl ExecutionPipeline {
- pub fn new(
- plan: Arc<dyn ExecutionPlan>,
- task_context: Arc<TaskContext>,
- depth: usize,
- ) -> Result<Self> {
- // The point in the plan at which to splice the plan graph
- let mut splice_point = plan;
- let mut parent_plans = Vec::with_capacity(depth.saturating_sub(1));
- for _ in 0..depth {
- let children = splice_point.children();
- assert_eq!(
- children.len(),
- 1,
- "can only group through nodes with a single child"
- );
- parent_plans.push(splice_point);
- splice_point = children.into_iter().next().unwrap();
- }
-
- // The children to replace with [`ProxyExecutionPlan`]
- let children = splice_point.children();
- let mut inputs = Vec::with_capacity(children.len());
-
- // The spliced plan with its children replaced with [`ProxyExecutionPlan`]
- let spliced = if !children.is_empty() {
- let mut proxies: Vec<Arc<dyn ExecutionPlan>> =
- Vec::with_capacity(children.len());
-
- for child in children {
- let count = child.output_partitioning().partition_count();
-
- let mut child_inputs = Vec::with_capacity(count);
- for _ in 0..count {
- child_inputs.push(Default::default())
- }
-
- inputs.push(child_inputs.clone());
- proxies.push(Arc::new(ProxyExecutionPlan {
- inner: child,
- inputs: child_inputs,
- }));
- }
-
- splice_point.with_new_children(proxies)?
- } else {
- splice_point.clone()
- };
-
- // Reconstruct the parent graph
- let mut proxied = spliced;
- for parent in parent_plans.into_iter().rev() {
- proxied = parent.with_new_children(vec![proxied])?
- }
-
- // Construct the output streams
- let output_count = proxied.output_partitioning().partition_count();
- let outputs = (0..output_count)
- .map(|x| proxied.execute(x, task_context.clone()).map(Mutex::new))
- .collect::<Result<_>>()?;
-
- Ok(Self {
- proxied,
- inputs,
- outputs,
- })
- }
-}
-
-impl Pipeline for ExecutionPipeline {
- /// Push a [`RecordBatch`] to the given input partition
- fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> {
- let mut partition = self.inputs[child][partition].lock();
- assert!(!partition.is_closed);
-
- partition.buffer.push_back(input);
- for waker in partition.wait_list.drain(..) {
- waker.wake()
- }
- Ok(())
- }
-
- fn close(&self, child: usize, partition: usize) {
- let mut partition = self.inputs[child][partition].lock();
- assert!(!partition.is_closed);
-
- partition.is_closed = true;
- for waker in partition.wait_list.drain(..) {
- waker.wake()
- }
- }
-
- fn output_partitions(&self) -> usize {
- self.outputs.len()
- }
-
- /// Poll an output partition, attempting to get its output
- fn poll_partition(
- &self,
- cx: &mut Context<'_>,
- partition: usize,
- ) -> Poll<Option<Result<RecordBatch>>> {
- self.outputs[partition]
- .lock()
- .poll_next_unpin(cx)
- .map(|opt| opt.map(|r| r.map_err(Into::into)))
- }
-}
-
-#[derive(Debug, Default)]
-struct InputPartition {
- buffer: VecDeque<RecordBatch>,
- wait_list: Vec<Waker>,
- is_closed: bool,
-}
-
-struct InputPartitionStream {
- schema: SchemaRef,
- partition: Arc<Mutex<InputPartition>>,
-}
-
-impl Stream for InputPartitionStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut partition = self.partition.lock();
- match partition.buffer.pop_front() {
- Some(batch) => Poll::Ready(Some(Ok(batch))),
- None if partition.is_closed => Poll::Ready(None),
- _ => {
- partition.wait_list.push(cx.waker().clone());
- Poll::Pending
- }
- }
- }
-}
-
-impl RecordBatchStream for InputPartitionStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-/// This is a hack that allows injecting [`InputPartitionStream`] in place of the
-/// streams yielded by the child of the wrapped [`ExecutionPlan`]
-///
-/// This is hopefully temporary pending reworking [`ExecutionPlan`]
-#[derive(Debug)]
-struct ProxyExecutionPlan {
- inner: Arc<dyn ExecutionPlan>,
-
- inputs: Vec<Arc<Mutex<InputPartition>>>,
-}
-
-impl ExecutionPlan for ProxyExecutionPlan {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn schema(&self) -> SchemaRef {
- self.inner.schema()
- }
-
- fn output_partitioning(&self) -> Partitioning {
- self.inner.output_partitioning()
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.inner.output_ordering()
- }
-
- fn required_input_distribution(&self) -> Vec<Distribution> {
- self.inner.required_input_distribution()
- }
-
- fn maintains_input_order(&self) -> Vec<bool> {
- self.inner.maintains_input_order()
- }
-
- fn benefits_from_input_partitioning(&self) -> bool {
- self.inner.benefits_from_input_partitioning()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- _children: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- unimplemented!()
- }
-
- fn execute(
- &self,
- partition: usize,
- _context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- Ok(Box::pin(InputPartitionStream {
- schema: self.schema(),
- partition: self.inputs[partition].clone(),
- }))
- }
-
- fn metrics(&self) -> Option<MetricsSet> {
- self.inner.metrics()
- }
-
- fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
- write!(f, "ProxyExecutionPlan")
- }
-
- fn statistics(&self) -> Statistics {
- self.inner.statistics()
- }
-}
-
-struct NodeDescriptor {
- operator: String,
- children: Vec<NodeDescriptor>,
-}
-
-impl std::fmt::Debug for NodeDescriptor {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct(&self.operator)
- .field("children", &self.children)
- .finish()
- }
-}
-
-fn debug_tree(plan: &dyn ExecutionPlan) -> NodeDescriptor {
- let operator = format!("{}", displayable(plan).one_line());
- let children = plan
- .children()
- .into_iter()
- .map(|x| debug_tree(x.as_ref()))
- .collect();
-
- NodeDescriptor { operator, children }
-}
diff --git a/datafusion/core/src/scheduler/pipeline/mod.rs b/datafusion/core/src/scheduler/pipeline/mod.rs
deleted file mode 100644
index c1838cd1a4..0000000000
--- a/datafusion/core/src/scheduler/pipeline/mod.rs
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::task::{Context, Poll};
-
-use arrow::record_batch::RecordBatch;
-
-use crate::error::Result;
-
-pub mod execution;
-pub mod repartition;
-
-/// A push-based interface used by the scheduler to drive query execution
-///
-/// A pipeline processes data from one or more input partitions, producing output
-/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
-/// more than one upstream [`Pipeline`], input partitions are identified by both
-/// a child index, and a partition index, whereas output partitions are only
-/// identified by a partition index.
-///
-/// This is not intended as an eventual replacement for the physical plan representation
-/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
-/// parts of the physical plan are "compiled" into by the scheduler.
-///
-/// # Eager vs Lazy Execution
-///
-/// Whether computation is eagerly done on push, or lazily done on pull, is
-/// intentionally left as an implementation detail of the [`Pipeline`]
-///
-/// This allows flexibility to support the following different patterns, and potentially more:
-///
-/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`]
-/// and immediately wakes the corresponding output partition.
-///
-/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon
-/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when
-/// the job completes. Order and non-order preserving variants are possible
-///
-/// A merge pipeline which combines data from one or more input partitions into one or
-/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes
-/// any output partitions that may now be able to make progress. This may be none if
-/// the operator is waiting on data from a different input partition
-///
-/// An aggregation pipeline which combines data from one or more input partitions into
-/// a single output partition. [`Pipeline::push`] would eagerly update the computed
-/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output.
-/// It would also be possible to flush once the partial aggregates reach a certain size
-///
-/// A partition-aware aggregation pipeline, which functions similarly to the above, but
-/// computes aggregations per input partition, before combining these prior to flush.
-///
-/// An async input pipeline, which has no inputs, and wakes the output partition
-/// whenever new data is available
-///
-/// A JIT compiled sequence of synchronous operators, that perform multiple operations
-/// from the physical plan as a single [`Pipeline`]. Parallelized implementations
-/// are also possible
-///
-/// [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
-pub trait Pipeline: Send + Sync + std::fmt::Debug {
- /// Push a [`RecordBatch`] to the given input partition
- fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>;
-
- /// Mark an input partition as exhausted
- fn close(&self, child: usize, partition: usize);
-
- /// Returns the number of output partitions
- fn output_partitions(&self) -> usize;
-
- /// Attempt to pull out the next value of the given output partition, registering the
- /// current task for wakeup if the value is not yet available, and returning `None`
- /// if the output partition is exhausted and will never yield any further values
- ///
- /// # Return value
- ///
- /// There are several possible return values:
- ///
- /// - `Poll::Pending` indicates that this partition's next value is not ready yet.
- /// Implementations should use the waker provided by `cx` to notify the scheduler when
- /// progress may be able to be made
- ///
- /// - `Poll::Ready(Some(Ok(val)))` returns the next value from this output partition,
- /// the output partition should be polled again as it may have further values. The returned
- /// value will be routed to the next pipeline in the query
- ///
- /// - `Poll::Ready(Some(Err(e)))` returns an error that will be routed to the query's output
- /// and the query execution aborted.
- ///
- /// - `Poll::Ready(None)` indicates that this partition is exhausted and will not produce any
- /// further values.
- ///
- fn poll_partition(
- &self,
- cx: &mut Context<'_>,
- partition: usize,
- ) -> Poll<Option<Result<RecordBatch>>>;
-}
diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs b/datafusion/core/src/scheduler/pipeline/repartition.rs
deleted file mode 100644
index 7eeb3c31de..0000000000
--- a/datafusion/core/src/scheduler/pipeline/repartition.rs
+++ /dev/null
@@ -1,155 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::VecDeque;
-use std::task::{Context, Poll, Waker};
-
-use parking_lot::Mutex;
-
-use crate::arrow::record_batch::RecordBatch;
-use crate::error::Result;
-use crate::physical_plan::repartition::BatchPartitioner;
-use crate::physical_plan::Partitioning;
-
-use crate::scheduler::pipeline::Pipeline;
-
-/// A [`Pipeline`] that can repartition its input
-#[derive(Debug)]
-pub struct RepartitionPipeline {
- output_count: usize,
- state: Mutex<RepartitionState>,
-}
-
-impl RepartitionPipeline {
- /// Create a new [`RepartitionPipeline`] with the given `input` and `output` partitioning
- pub fn try_new(input: Partitioning, output: Partitioning) -> Result<Self> {
- let input_count = input.partition_count();
- let output_count = output.partition_count();
- assert_ne!(input_count, 0);
- assert_ne!(output_count, 0);
-
- // TODO: metrics support
- let partitioner = BatchPartitioner::try_new(output, Default::default())?;
-
- let state = Mutex::new(RepartitionState {
- partitioner,
- partition_closed: vec![false; input_count],
- input_closed: false,
- output_buffers: (0..output_count).map(|_| Default::default()).collect(),
- });
-
- Ok(Self {
- state,
- output_count,
- })
- }
-}
-
-struct RepartitionState {
- partitioner: BatchPartitioner,
- partition_closed: Vec<bool>,
- input_closed: bool,
- output_buffers: Vec<OutputBuffer>,
-}
-
-impl std::fmt::Debug for RepartitionState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("RepartitionState")
- .field("partition_closed", &self.partition_closed)
- .field("input_closed", &self.input_closed)
- .finish()
- }
-}
-
-impl Pipeline for RepartitionPipeline {
- fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> {
- assert_eq!(child, 0);
-
- let mut state = self.state.lock();
- assert!(
- !state.partition_closed[partition],
- "attempt to push to closed partition {partition} of RepartitionPipeline({state:?})"
- );
-
- let state = &mut *state;
- state.partitioner.partition(input, |partition, batch| {
- state.output_buffers[partition].push_batch(batch);
- Ok(())
- })
- }
-
- fn close(&self, child: usize, partition: usize) {
- assert_eq!(child, 0);
-
- let mut state = self.state.lock();
- assert!(
- !state.partition_closed[partition],
- "attempt to close already closed partition {partition} of RepartitionPipeline({state:?})"
- );
-
- state.partition_closed[partition] = true;
-
- // If all input streams exhausted, wake outputs
- if state.partition_closed.iter().all(|x| *x) {
- state.input_closed = true;
- for buffer in &mut state.output_buffers {
- for waker in buffer.wait_list.drain(..) {
- waker.wake()
- }
- }
- }
- }
-
- fn output_partitions(&self) -> usize {
- self.output_count
- }
-
- fn poll_partition(
- &self,
- cx: &mut Context<'_>,
- partition: usize,
- ) -> Poll<Option<Result<RecordBatch>>> {
- let mut state = self.state.lock();
- let input_closed = state.input_closed;
- let buffer = &mut state.output_buffers[partition];
-
- match buffer.batches.pop_front() {
- Some(batch) => Poll::Ready(Some(Ok(batch))),
- None if input_closed => Poll::Ready(None),
- _ => {
- buffer.wait_list.push(cx.waker().clone());
- Poll::Pending
- }
- }
- }
-}
-
-#[derive(Debug, Default)]
-struct OutputBuffer {
- batches: VecDeque<RecordBatch>,
- wait_list: Vec<Waker>,
-}
-
-impl OutputBuffer {
- fn push_batch(&mut self, batch: RecordBatch) {
- self.batches.push_back(batch);
-
- for waker in self.wait_list.drain(..) {
- waker.wake()
- }
- }
-}
diff --git a/datafusion/core/src/scheduler/plan.rs b/datafusion/core/src/scheduler/plan.rs
deleted file mode 100644
index b5a786a322..0000000000
--- a/datafusion/core/src/scheduler/plan.rs
+++ /dev/null
@@ -1,296 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::datatypes::SchemaRef;
-use std::sync::Arc;
-
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::{ExecutionPlan, Partitioning};
-
-use crate::scheduler::pipeline::{
- execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
-};
-
-/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct OutputLink {
- /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to
- pub pipeline: usize,
-
- /// The child of the [`Pipeline`] to route output to
- pub child: usize,
-}
-
-/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
-#[derive(Debug)]
-pub struct RoutablePipeline {
- /// The pipeline that produces data
- pub pipeline: Box<dyn Pipeline>,
-
- /// Where to send output the output of `pipeline`
- ///
- /// If `None`, the output should be sent to the query output
- pub output: Option<OutputLink>,
-}
-
-/// [`PipelinePlan`] is the scheduler's representation of the [`ExecutionPlan`] passed to
-/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
-/// necessary to route output from one stage to the next
-#[derive(Debug)]
-pub struct PipelinePlan {
- /// Schema of this plans output
- pub schema: SchemaRef,
-
- /// Number of output partitions
- pub output_partitions: usize,
-
- /// Pipelines that comprise this plan
- pub pipelines: Vec<RoutablePipeline>,
-}
-
-/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
-/// together multiple operators, [`OperatorGroup`] stores this state
-struct OperatorGroup {
- /// Where to route the output of the eventual [`Pipeline`]
- output: Option<OutputLink>,
-
- /// The [`ExecutionPlan`] from which to start recursing
- root: Arc<dyn ExecutionPlan>,
-
- /// The number of times to recurse into the [`ExecutionPlan`]'s children
- depth: usize,
-}
-
-/// A utility struct to assist converting from [`ExecutionPlan`] to [`PipelinePlan`]
-///
-/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually building
-/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited depth-first,
-/// a node is visited only after its parent has been.
-pub struct PipelinePlanner {
- task_context: Arc<TaskContext>,
-
- /// The schema of this plan
- schema: SchemaRef,
-
- /// The number of output partitions of this plan
- output_partitions: usize,
-
- /// The current list of completed pipelines
- completed: Vec<RoutablePipeline>,
-
- /// A list of [`ExecutionPlan`] still to visit, along with
- /// where they should route their output
- to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
-
- /// Stores one or more operators to combine
- /// together into a single [`ExecutionPipeline`]
- execution_operators: Option<OperatorGroup>,
-}
-
-impl PipelinePlanner {
- pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) -> Self {
- let schema = plan.schema();
- let output_partitions = plan.output_partitioning().partition_count();
- Self {
- completed: vec![],
- to_visit: vec![(plan, None)],
- task_context,
- execution_operators: None,
- schema,
- output_partitions,
- }
- }
-
- /// Flush the current group of operators stored in `execution_operators`
- /// into a single [`ExecutionPipeline]
- fn flush_exec(&mut self) -> Result<usize> {
- let group = self.execution_operators.take().unwrap();
- let node_idx = self.completed.len();
- self.completed.push(RoutablePipeline {
- pipeline: Box::new(ExecutionPipeline::new(
- group.root,
- self.task_context.clone(),
- group.depth,
- )?),
- output: group.output,
- });
- Ok(node_idx)
- }
-
- /// Visit a non-special cased [`ExecutionPlan`]
- fn visit_exec(
- &mut self,
- plan: Arc<dyn ExecutionPlan>,
- parent: Option<OutputLink>,
- ) -> Result<()> {
- let children = plan.children();
-
- // Add the operator to the current group of operators to be combined
- // into a single [`ExecutionPipeline`].
- //
- // TODO: More sophisticated policy, just because we can combine them doesn't mean we should
- match self.execution_operators.as_mut() {
- Some(buffer) => {
- assert_eq!(parent, buffer.output, "QueryBuilder out of sync");
- buffer.depth += 1;
- }
- None => {
- self.execution_operators = Some(OperatorGroup {
- output: parent,
- root: plan,
- depth: 0,
- })
- }
- }
-
- match children.len() {
- 1 => {
- // Enqueue the children with the parent of the `OperatorGroup`
- self.to_visit
- .push((children.into_iter().next().unwrap(), parent))
- }
- _ => {
- // We can only recursively group through nodes with a single child, therefore
- // if this node has multiple children, we now need to flush the buffer and
- // enqueue its children with this new pipeline as its parent
- let node = self.flush_exec()?;
- self.enqueue_children(children, node);
- }
- }
-
- Ok(())
- }
-
- /// Add the given list of children to the stack of [`ExecutionPlan`] to visit
- fn enqueue_children(
- &mut self,
- children: Vec<Arc<dyn ExecutionPlan>>,
- parent_node_idx: usize,
- ) {
- for (child_idx, child) in children.into_iter().enumerate() {
- self.to_visit.push((
- child,
- Some(OutputLink {
- pipeline: parent_node_idx,
- child: child_idx,
- }),
- ))
- }
- }
-
- /// Push a new [`RoutablePipeline`] and enqueue its children to be visited
- fn push_pipeline(
- &mut self,
- node: RoutablePipeline,
- children: Vec<Arc<dyn ExecutionPlan>>,
- ) {
- let node_idx = self.completed.len();
- self.completed.push(node);
- self.enqueue_children(children, node_idx)
- }
-
- /// Push a new [`RepartitionPipeline`] first flushing any buffered [`OperatorGroup`]
- fn push_repartition(
- &mut self,
- input: Partitioning,
- output: Partitioning,
- parent: Option<OutputLink>,
- children: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<()> {
- let parent = match &self.execution_operators {
- Some(buffer) => {
- assert_eq!(buffer.output, parent, "QueryBuilder out of sync");
- Some(OutputLink {
- pipeline: self.flush_exec()?,
- child: 0, // Must be the only child
- })
- }
- None => parent,
- };
-
- let node = Box::new(RepartitionPipeline::try_new(input, output)?);
- self.push_pipeline(
- RoutablePipeline {
- pipeline: node,
- output: parent,
- },
- children,
- );
- Ok(())
- }
-
- /// Visit an [`ExecutionPlan`] operator and add it to the [`PipelinePlan`] being built
- fn visit_operator(
- &mut self,
- plan: Arc<dyn ExecutionPlan>,
- parent: Option<OutputLink>,
- ) -> Result<()> {
- if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
- self.push_repartition(
- repartition.input().output_partitioning(),
- repartition.output_partitioning(),
- parent,
- repartition.children(),
- )
- } else if let Some(coalesce) =
- plan.as_any().downcast_ref::<CoalescePartitionsExec>()
- {
- self.push_repartition(
- coalesce.input().output_partitioning(),
- Partitioning::RoundRobinBatch(1),
- parent,
- coalesce.children(),
- )
- } else {
- self.visit_exec(plan, parent)
- }
- }
-
- /// Build a [`PipelinePlan`] from the [`ExecutionPlan`] provided to [`PipelinePlanner::new`]
- ///
- /// This will group all operators possible into a single [`ExecutionPipeline`], only
- /// creating new pipelines when:
- ///
- /// - encountering an operator with multiple children
- /// - encountering a repartitioning operator
- ///
- /// This latter case is because currently the repartitioning operators in DataFusion are
- /// coupled with the non-scheduler-based parallelism story
- ///
- /// The above logic is liable to change, is considered an implementation detail of the
- /// scheduler, and should not be relied upon by operators
- ///
- pub fn build(mut self) -> Result<PipelinePlan> {
- // We do a depth-first scan of the operator tree, extracting a list of [`QueryNode`]
- while let Some((plan, parent)) = self.to_visit.pop() {
- self.visit_operator(plan, parent)?;
- }
-
- if self.execution_operators.is_some() {
- self.flush_exec()?;
- }
-
- Ok(PipelinePlan {
- schema: self.schema,
- output_partitions: self.output_partitions,
- pipelines: self.completed,
- })
- }
-}
diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs
deleted file mode 100644
index bf10186ef2..0000000000
--- a/datafusion/core/src/scheduler/task.rs
+++ /dev/null
@@ -1,509 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::{DataFusionError, Result};
-use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use crate::scheduler::{
- is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, RoutablePipeline,
- Spawner,
-};
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use futures::channel::mpsc;
-use futures::task::ArcWake;
-use futures::{ready, Stream, StreamExt};
-use log::{debug, trace};
-use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Weak};
-use std::task::{Context, Poll};
-
-/// Spawns a [`PipelinePlan`] using the provided [`Spawner`]
-pub(crate) fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults {
- debug!("Spawning pipeline plan: {:#?}", plan);
-
- let (senders, receivers) = (0..plan.output_partitions)
- .map(|_| mpsc::unbounded())
- .unzip::<_, _, Vec<_>, Vec<_>>();
-
- let context = Arc::new(ExecutionContext {
- spawner,
- pipelines: plan.pipelines,
- schema: plan.schema,
- output: senders,
- });
-
- for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() {
- for partition in 0..query_pipeline.pipeline.output_partitions() {
- context.spawner.spawn(Task {
- context: context.clone(),
- waker: Arc::new(TaskWaker {
- context: Arc::downgrade(&context),
- wake_count: AtomicUsize::new(1),
- pipeline: pipeline_idx,
- partition,
- }),
- });
- }
- }
-
- let partitions = receivers
- .into_iter()
- .map(|receiver| ExecutionResultStream {
- receiver,
- context: context.clone(),
- })
- .collect();
-
- ExecutionResults {
- streams: partitions,
- context,
- }
-}
-
-/// A [`Task`] identifies an output partition within a given pipeline that may be able to
-/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding
-/// [`Task`] and distributes them amongst its worker threads.
-pub struct Task {
- /// Maintain a link to the [`ExecutionContext`] this is necessary to be able to
- /// route the output of the partition to its destination
- context: Arc<ExecutionContext>,
-
- /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution
- waker: Arc<TaskWaker>,
-}
-
-impl std::fmt::Debug for Task {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let output = &self.context.pipelines[self.waker.pipeline].output;
-
- f.debug_struct("Task")
- .field("pipeline", &self.waker.pipeline)
- .field("partition", &self.waker.partition)
- .field("output", &output)
- .finish()
- }
-}
-
-impl Task {
- fn handle_error(
- &self,
- partition: usize,
- routable: &RoutablePipeline,
- error: DataFusionError,
- ) {
- match routable.output {
- Some(link) => {
- // The query output partitioning may not match the current pipeline's
- // but the query output has at least one partition
- // so send error to the first partition of the query output.
- self.context.send_query_output(0, Err(error));
-
- trace!(
- "Closing pipeline: {:?}, partition: {}, due to error",
- link,
- self.waker.partition,
- );
-
- self.context.pipelines[link.pipeline]
- .pipeline
- .close(link.child, self.waker.partition);
- }
- None => self.context.send_query_output(partition, Err(error)),
- }
- }
-
- /// Call [`Pipeline::poll_partition`][super::pipeline::Pipeline::poll_partition],
- /// attempting to make progress on query execution
- pub fn do_work(self) {
- assert!(is_worker(), "Task::do_work called outside of worker pool");
- if self.context.is_cancelled() {
- return;
- }
-
- // Capture the wake count prior to calling [`Pipeline::poll_partition`]
- // this allows us to detect concurrent wake ups and handle them correctly
- let wake_count = self.waker.wake_count.load(Ordering::SeqCst);
-
- let node = self.waker.pipeline;
- let partition = self.waker.partition;
-
- let waker = futures::task::waker_ref(&self.waker);
- let mut cx = Context::from_waker(&waker);
-
- let pipelines = &self.context.pipelines;
- let routable = &pipelines[node];
- match routable.pipeline.poll_partition(&mut cx, partition) {
- Poll::Ready(Some(Ok(batch))) => {
- trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
- match routable.output {
- Some(link) => {
- trace!(
- "Publishing batch to pipeline {:?} partition {}",
- link,
- partition
- );
-
- let r = pipelines[link.pipeline]
- .pipeline
- .push(batch, link.child, partition);
-
- if let Err(e) = r {
- self.handle_error(partition, routable, e);
-
- // Return without rescheduling this output again
- return;
- }
- }
- None => {
- trace!("Publishing batch to output");
- self.context.send_query_output(partition, Ok(batch))
- }
- }
-
- // Reschedule this pipeline again
- //
- // We want to prioritise running tasks triggered by the most recent
- // batch, so reschedule with FIFO ordering
- //
- // Note: We must schedule after we have routed the batch, otherwise
- // we introduce a potential ordering race where the newly scheduled
- // task runs before this task finishes routing the output
- spawn_local_fifo(self);
- }
- Poll::Ready(Some(Err(e))) => {
- trace!("Poll {:?}: Error: {:?}", self, e);
- self.handle_error(partition, routable, e)
- }
- Poll::Ready(None) => {
- trace!("Poll {:?}: None", self);
- match routable.output {
- Some(link) => {
- trace!("Closing pipeline: {:?}, partition: {}", link, partition);
- pipelines[link.pipeline]
- .pipeline
- .close(link.child, partition)
- }
- None => self.context.finish(partition),
- }
- }
- Poll::Pending => {
- trace!("Poll {:?}: Pending", self);
- // Attempt to reset the wake count with the value obtained prior
- // to calling [`Pipeline::poll_partition`].
- //
- // If this fails it indicates a wakeup was received whilst executing
- // [`Pipeline::poll_partition`] and we should reschedule the task
- let reset = self.waker.wake_count.compare_exchange(
- wake_count,
- 0,
- Ordering::SeqCst,
- Ordering::SeqCst,
- );
-
- if reset.is_err() {
- trace!("Wakeup triggered whilst polling: {:?}", self);
- spawn_local(self);
- }
- }
- }
- }
-}
-
-/// The results of the execution of a query
-pub struct ExecutionResults {
- /// [`ExecutionResultStream`] for each partition of this query
- streams: Vec<ExecutionResultStream>,
-
- /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
- context: Arc<ExecutionContext>,
-}
-
-impl ExecutionResults {
- /// Returns a [`SendableRecordBatchStream`] of this execution
- ///
- /// In the event of multiple output partitions, the output will be interleaved
- pub fn stream(self) -> SendableRecordBatchStream {
- let schema = self.context.schema.clone();
- Box::pin(RecordBatchStreamAdapter::new(
- schema,
- futures::stream::select_all(self.streams),
- ))
- }
-
- /// Returns a [`SendableRecordBatchStream`] for each partition of this execution
- pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> {
- self.streams.into_iter().map(|s| Box::pin(s) as _).collect()
- }
-}
-
-/// A result stream for the execution of a query
-struct ExecutionResultStream {
- receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>,
-
- /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
- context: Arc<ExecutionContext>,
-}
-
-impl Stream for ExecutionResultStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten();
- Poll::Ready(opt)
- }
-}
-
-impl RecordBatchStream for ExecutionResultStream {
- fn schema(&self) -> SchemaRef {
- self.context.schema.clone()
- }
-}
-
-/// The shared state of all [`Task`] created from the same [`PipelinePlan`]
-#[derive(Debug)]
-struct ExecutionContext {
- /// Spawner for this query
- spawner: Spawner,
-
- /// List of pipelines that belong to this query, pipelines are addressed
- /// based on their index within this list
- pipelines: Vec<RoutablePipeline>,
-
- /// Schema of this plans output
- pub schema: SchemaRef,
-
- /// The output streams, per partition, for this query's execution
- output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>,
-}
-
-impl Drop for ExecutionContext {
- fn drop(&mut self) {
- debug!("ExecutionContext dropped");
- }
-}
-
-impl ExecutionContext {
- /// Returns `true` if this query has been dropped, specifically if the
- /// stream returned by [`super::Scheduler::schedule`] has been dropped
- fn is_cancelled(&self) -> bool {
- self.output.iter().all(|x| x.is_closed())
- }
-
- /// Sends `output` to this query's output stream
- fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) {
- debug_assert!(
- self.output.len() > partition,
- "the specified partition exceeds the total number of output partitions"
- );
- let _ = self.output[partition].unbounded_send(Some(output));
- }
-
- /// Mark this partition as finished
- fn finish(&self, partition: usize) {
- let _ = self.output[partition].unbounded_send(None);
- }
-}
-
-struct TaskWaker {
- /// Store a weak reference to the [`ExecutionContext`] to avoid reference cycles if this
- /// [`Waker`] is stored within a `Pipeline` owned by the [`ExecutionContext`]
- ///
- /// [`Waker`]: std::task::Waker
- context: Weak<ExecutionContext>,
-
- /// A counter that stores the number of times this has been awoken
- ///
- /// A value > 0, implies the task is either in the ready queue or
- /// currently being executed
- ///
- /// `TaskWaker::wake` always increments the `wake_count`, however, it only
- /// re-enqueues the [`Task`] if the value prior to increment was 0
- ///
- /// This ensures that a given [`Task`] is not enqueued multiple times
- ///
- /// We store an integer, as opposed to a boolean, so that wake ups that
- /// occur during `Pipeline::poll_partition` can be detected and handled
- /// after it has finished executing
- wake_count: AtomicUsize,
-
- /// The index of the pipeline within `query` to poll
- pipeline: usize,
-
- /// The partition of the pipeline within `query` to poll
- partition: usize,
-}
-
-impl ArcWake for TaskWaker {
- fn wake(self: Arc<Self>) {
- if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 {
- trace!("Ignoring duplicate wakeup");
- return;
- }
-
- if let Some(context) = self.context.upgrade() {
- let task = Task {
- context,
- waker: self.clone(),
- };
-
- trace!("Wakeup {:?}", task);
-
- // If called from a worker, spawn to the current worker's
- // local queue, otherwise reschedule on any worker
- match is_worker() {
- true => spawn_local(task),
- false => task.context.spawner.clone().spawn(task),
- }
- } else {
- trace!("Dropped wakeup");
- }
- }
-
- fn wake_by_ref(s: &Arc<Self>) {
- ArcWake::wake(s.clone())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::error::Result;
- use crate::scheduler::{pipeline::Pipeline, plan::RoutablePipeline, Scheduler};
- use arrow::array::{ArrayRef, Int32Array};
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow::record_batch::RecordBatch;
- use futures::{channel::oneshot, ready, FutureExt, StreamExt};
- use parking_lot::Mutex;
- use std::fmt::Debug;
- use std::time::Duration;
-
- /// Tests that waker can be sent to tokio pool
- #[derive(Debug)]
- struct TokioPipeline {
- handle: tokio::runtime::Handle,
- state: Mutex<State>,
- }
-
- #[derive(Debug)]
- enum State {
- Init,
- Wait(oneshot::Receiver<Result<RecordBatch>>),
- Finished,
- }
-
- impl Default for State {
- fn default() -> Self {
- Self::Init
- }
- }
-
- impl Pipeline for TokioPipeline {
- fn push(
- &self,
- _input: RecordBatch,
- _child: usize,
- _partition: usize,
- ) -> Result<()> {
- unreachable!()
- }
-
- fn close(&self, _child: usize, _partition: usize) {}
-
- fn output_partitions(&self) -> usize {
- 1
- }
-
- fn poll_partition(
- &self,
- cx: &mut Context<'_>,
- _partition: usize,
- ) -> Poll<Option<Result<RecordBatch>>> {
- let mut state = self.state.lock();
- loop {
- match &mut *state {
- State::Init => {
- let (sender, receiver) = oneshot::channel();
- self.handle.spawn(async move {
- tokio::time::sleep(Duration::from_millis(10)).await;
- let array = Int32Array::from_iter_values([1, 2, 3]);
- sender.send(
- RecordBatch::try_from_iter([(
- "int",
- Arc::new(array) as ArrayRef,
- )])
- .map_err(DataFusionError::ArrowError),
- )
- });
- *state = State::Wait(receiver)
- }
- State::Wait(r) => {
- let v = ready!(r.poll_unpin(cx)).ok();
- *state = State::Finished;
- return Poll::Ready(v);
- }
- State::Finished => return Poll::Ready(None),
- }
- }
- }
- }
-
- #[test]
- fn test_tokio_waker() {
- let scheduler = Scheduler::new(2);
-
- // A tokio runtime
- let runtime = tokio::runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- // A pipeline that dispatches to a tokio worker
- let pipeline = TokioPipeline {
- handle: runtime.handle().clone(),
- state: Default::default(),
- };
-
- let plan = PipelinePlan {
- schema: Arc::new(Schema::new(vec![Field::new(
- "int",
- DataType::Int32,
- false,
- )])),
- output_partitions: 1,
- pipelines: vec![RoutablePipeline {
- pipeline: Box::new(pipeline),
- output: None,
- }],
- };
-
- let mut receiver = scheduler.schedule_plan(plan).stream();
-
- runtime.block_on(async move {
- // Should wait for output
- let batch = receiver.next().await.unwrap().unwrap();
- assert_eq!(batch.num_rows(), 3);
-
- // Next batch should be none
- assert!(receiver.next().await.is_none());
- })
- }
-}