You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/30 18:57:48 UTC

[arrow-datafusion] branch main updated: Remove Rayon-based Scheduler (#6169)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 71efcf5ee8 Remove Rayon-based Scheduler (#6169)
71efcf5ee8 is described below

commit 71efcf5ee8900a1efe12fb812e210e6941060733
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Apr 30 19:57:41 2023 +0100

    Remove Rayon-based Scheduler (#6169)
    
    * Remove scheduler
    
    * Fix tpch
    
    * Format
---
 .github/workflows/docs_pr.yaml                     |   2 +-
 .github/workflows/rust.yml                         |  11 +-
 benchmarks/Cargo.toml                              |   2 +-
 benchmarks/src/bin/tpch.rs                         |  34 +-
 ci/scripts/rust_clippy.sh                          |   2 +-
 datafusion/core/Cargo.toml                         |   4 -
 datafusion/core/benches/parquet_query_sql.rs       |  19 -
 datafusion/core/src/lib.rs                         |   2 -
 datafusion/core/src/scheduler/mod.rs               | 460 -------------------
 .../core/src/scheduler/pipeline/execution.rs       | 307 -------------
 datafusion/core/src/scheduler/pipeline/mod.rs      | 111 -----
 .../core/src/scheduler/pipeline/repartition.rs     | 155 -------
 datafusion/core/src/scheduler/plan.rs              | 296 ------------
 datafusion/core/src/scheduler/task.rs              | 509 ---------------------
 14 files changed, 15 insertions(+), 1899 deletions(-)

diff --git a/.github/workflows/docs_pr.yaml b/.github/workflows/docs_pr.yaml
index 29908abba0..821321c8c5 100644
--- a/.github/workflows/docs_pr.yaml
+++ b/.github/workflows/docs_pr.yaml
@@ -49,6 +49,6 @@ jobs:
           rust-version: stable
       # Note: this does not include dictionary_expressions to reduce codegen
       - name: Run doctests
-        run: cargo test --doc --features avro,scheduler,json
+        run: cargo test --doc --features avro,json
       - name: Verify Working Directory Clean
         run: git diff --exit-code
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e53d66945d..654c0d5649 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -67,8 +67,7 @@ jobs:
 
       # Note: this does not include dictionary_expressions to reduce codegen
       - name: Check workspace with all features
-        run: cargo check --workspace --benches --features avro,scheduler,json
-
+        run: cargo check --workspace --benches --features avro,json
       - name: Check Cargo.lock for datafusion-cli
         run: |
           # If this test fails, try running `cargo update` in the `datafusion-cli` directory
@@ -97,7 +96,7 @@ jobs:
         with:
           rust-version: stable
       - name: Run tests (excluding doctests)
-        run: cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+        run: cargo test --lib --tests --bins --features avro,json,dictionary_expressions
       - name: Verify Working Directory Clean
         run: git diff --exit-code
 
@@ -153,7 +152,7 @@ jobs:
           rust-version: stable
       # Note: this does not include dictionary_expressions to reduce codegen
       - name: Run doctests
-        run: cargo test --doc --features avro,scheduler,json
+        run: cargo test --doc --features avro,json
       - name: Verify Working Directory Clean
         run: git diff --exit-code
 
@@ -272,7 +271,7 @@ jobs:
         shell: bash
         run: |
           export PATH=$PATH:$HOME/d/protoc/bin
-          cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+          cargo test --lib --tests --bins --features avro,json,dictionary_expressions
         env:
           # do not produce debug symbols to keep memory usage down
           RUSTFLAGS: "-C debuginfo=0"
@@ -305,7 +304,7 @@ jobs:
       - name: Run tests (excluding doctests)
         shell: bash
         run: |
-          cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
+          cargo test --lib --tests --bins --features avro,json,dictionary_expressions
         env:
           # do not produce debug symbols to keep memory usage down
           RUSTFLAGS: "-C debuginfo=0"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index ce0545ba00..91effa3707 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,7 +34,7 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 arrow = { workspace = true }
-datafusion = { path = "../datafusion/core", version = "23.0.0", features = ["scheduler"] }
+datafusion = { path = "../datafusion/core", version = "23.0.0" }
 env_logger = "0.10"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 36da520cb7..43659a6f9f 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -37,8 +37,6 @@ use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant};
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
 use datafusion::datasource::listing::ListingTableUrl;
-use datafusion::scheduler::Scheduler;
-use futures::TryStreamExt;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -90,10 +88,6 @@ struct DataFusionBenchmarkOpt {
     /// Whether to disable collection of statistics (and cost based optimizations) or not.
     #[structopt(short = "S", long = "disable-statistics")]
     disable_statistics: bool,
-
-    /// Enable scheduler
-    #[structopt(short = "e", long = "enable-scheduler")]
-    enable_scheduler: bool,
 }
 
 #[derive(Debug, StructOpt)]
@@ -227,16 +221,14 @@ async fn benchmark_query(
         if query_id == 15 {
             for (n, query) in sql.iter().enumerate() {
                 if n == 1 {
-                    result = execute_query(&ctx, query, opt.debug, opt.enable_scheduler)
-                        .await?;
+                    result = execute_query(&ctx, query, opt.debug).await?;
                 } else {
-                    execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
+                    execute_query(&ctx, query, opt.debug).await?;
                 }
             }
         } else {
             for query in sql {
-                result =
-                    execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
+                result = execute_query(&ctx, query, opt.debug).await?;
             }
         }
 
@@ -295,7 +287,6 @@ async fn execute_query(
     ctx: &SessionContext,
     sql: &str,
     debug: bool,
-    enable_scheduler: bool,
 ) -> Result<Vec<RecordBatch>> {
     let plan = ctx.sql(sql).await?;
     let (state, plan) = plan.into_parts();
@@ -315,15 +306,7 @@ async fn execute_query(
             displayable(physical_plan.as_ref()).indent()
         );
     }
-    let result = if enable_scheduler {
-        let scheduler = Scheduler::new(num_cpus::get());
-        let results = scheduler
-            .schedule(physical_plan.clone(), state.task_ctx())
-            .unwrap();
-        results.stream().try_collect().await?
-    } else {
-        collect(physical_plan.clone(), state.task_ctx()).await?
-    };
+    let result = collect(physical_plan.clone(), state.task_ctx()).await?;
     if debug {
         println!(
             "=== Physical plan with metrics ===\n{}\n",
@@ -529,8 +512,7 @@ mod tests {
             // handle special q15 which contains "create view" sql statement
             if sql.starts_with("select") {
                 let explain = "explain ".to_string() + sql;
-                let result_batch =
-                    execute_query(&ctx, explain.as_str(), false, false).await?;
+                let result_batch = execute_query(&ctx, explain.as_str(), false).await?;
                 if !actual.is_empty() {
                     actual += "\n";
                 }
@@ -542,7 +524,7 @@ mod tests {
                 // let mut file = File::create(format!("expected-plans/q{}.txt", query))?;
                 // file.write_all(actual.as_bytes())?;
             } else {
-                execute_query(&ctx, sql.as_str(), false, false).await?;
+                execute_query(&ctx, sql.as_str(), false).await?;
             }
         }
 
@@ -726,7 +708,7 @@ mod tests {
 
         let sql = &get_query_sql(n)?;
         for query in sql {
-            execute_query(&ctx, query, false, false).await?;
+            execute_query(&ctx, query, false).await?;
         }
 
         Ok(())
@@ -757,7 +739,6 @@ mod ci {
             mem_table: false,
             output_path: None,
             disable_statistics: false,
-            enable_scheduler: false,
         };
         register_tables(&opt, &ctx).await?;
         let queries = get_query_sql(query)?;
@@ -1064,7 +1045,6 @@ mod ci {
             mem_table: false,
             output_path: None,
             disable_statistics: false,
-            enable_scheduler: false,
         };
         let mut results = benchmark_datafusion(opt).await?;
         assert_eq!(results.len(), 1);
diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh
index d22a098905..dfd2916981 100755
--- a/ci/scripts/rust_clippy.sh
+++ b/ci/scripts/rust_clippy.sh
@@ -18,4 +18,4 @@
 # under the License.
 
 set -ex
-cargo clippy --all-targets --workspace --features avro,pyarrow,scheduler -- -D warnings
+cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 1fe9947a20..023848641f 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -46,8 +46,6 @@ dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "da
 force_hash_collisions = []
 pyarrow = ["datafusion-common/pyarrow"]
 regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
-# Used to enable scheduler
-scheduler = ["rayon"]
 simd = ["arrow/simd"]
 unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]
 
@@ -86,7 +84,6 @@ parquet = { workspace = true }
 percent-encoding = "2.2.0"
 pin-project-lite = "^0.2.7"
 rand = "0.8"
-rayon = { version = "1.5", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 sqlparser = { version = "0.33", features = ["visitor"] }
 tempfile = "3"
@@ -150,7 +147,6 @@ name = "physical_plan"
 [[bench]]
 harness = false
 name = "parquet_query_sql"
-required-features = ["scheduler"]
 
 [[bench]]
 harness = false
diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs
index 9b0d809629..876b1fe7e1 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -25,7 +25,6 @@ use arrow::datatypes::{
 use arrow::record_batch::RecordBatch;
 use criterion::{criterion_group, criterion_main, Criterion};
 use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion::scheduler::Scheduler;
 use futures::stream::StreamExt;
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::{WriterProperties, WriterVersion};
@@ -196,8 +195,6 @@ fn criterion_benchmark(c: &mut Criterion) {
     let config = SessionConfig::new().with_target_partitions(partitions);
     let context = SessionContext::with_config(config);
 
-    let scheduler = Scheduler::new(partitions);
-
     let local_rt = tokio::runtime::Builder::new_current_thread()
         .build()
         .unwrap();
@@ -249,22 +246,6 @@ fn criterion_benchmark(c: &mut Criterion) {
                 })
             });
         });
-
-        c.bench_function(&format!("scheduled: {query}"), |b| {
-            b.iter(|| {
-                let query = query.clone();
-                let context = context.clone();
-
-                local_rt.block_on(async {
-                    let query = context.sql(&query).await.unwrap();
-                    let plan = query.create_physical_plan().await.unwrap();
-                    let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
-
-                    let mut stream = results.stream();
-                    while stream.next().await.transpose().unwrap().is_some() {}
-                });
-            });
-        });
     }
 
     // Temporary file must outlive the benchmarks, it is deleted when dropped
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index ddc63c157f..4082a22daa 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -422,8 +422,6 @@ pub mod physical_optimizer;
 pub mod physical_plan;
 pub mod prelude;
 pub mod scalar;
-#[cfg(feature = "scheduler")]
-pub mod scheduler;
 pub mod variable;
 
 // re-export dependencies from arrow-rs to minimise version maintenance for crate users
diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs
deleted file mode 100644
index 790c990467..0000000000
--- a/datafusion/core/src/scheduler/mod.rs
+++ /dev/null
@@ -1,460 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
-//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
-//! and is designed to decouple the execution parallelism from the parallelism expressed in
-//! the physical plan as partitions.
-//!
-//! # Implementation
-//!
-//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
-//! chunks called pipelines. Each pipeline may consist of one or more nodes from the
-//! [`ExecutionPlan`] tree.
-//!
-//! The scheduler then maintains a list of pending `Task`s, that identify a partition within
-//! a particular pipeline that may be able to make progress on some "morsel" of data. These
-//! `Task`s are then scheduled on the worker pool, with a preference for scheduling work
-//! on a given "morsel" on the same thread that produced it.
-//!
-//! # Rayon
-//!
-//! Under-the-hood these `Task`s are scheduled by [rayon], which is a lightweight, work-stealing
-//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
-//! structured concurrency primitives to express additional parallelism that may be exploited
-//! if there are idle threads available at runtime
-//!
-//! # Shutdown
-//!
-//! Queries scheduled on a [`Scheduler`] will run to completion even if the
-//! [`Scheduler`] is dropped
-//!
-//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
-//! [rayon]: https://docs.rs/rayon/latest/rayon/
-//!
-//! # Example
-//!
-//! ```rust
-//! # use futures::TryStreamExt;
-//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
-//! # use datafusion::scheduler::Scheduler;
-//!
-//! # #[tokio::main]
-//! # async fn main() {
-//! let scheduler = Scheduler::new(4);
-//! let config = SessionConfig::new().with_target_partitions(4);
-//! let context = SessionContext::with_config(config);
-//!
-//! context.register_csv("example", "../core/tests/data/example.csv", CsvReadOptions::new()).await.unwrap();
-//! let plan = context.sql("SELECT MIN(b) FROM example")
-//!     .await
-//!    .unwrap()
-//!    .create_physical_plan()
-//!    .await
-//!    .unwrap();
-//!
-//! let task = context.task_ctx();
-//! let results = scheduler.schedule(plan, task).unwrap();
-//! let scheduled: Vec<_> = results.stream().try_collect().await.unwrap();
-//! # }
-//! ```
-//!
-
-use std::sync::Arc;
-
-use log::{debug, error};
-
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::ExecutionPlan;
-
-use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
-use task::{spawn_plan, Task};
-
-use rayon::{ThreadPool, ThreadPoolBuilder};
-
-pub use task::ExecutionResults;
-
-mod pipeline;
-mod plan;
-mod task;
-
-/// Builder for a [`Scheduler`]
-#[derive(Debug)]
-pub struct SchedulerBuilder {
-    inner: ThreadPoolBuilder,
-}
-
-impl SchedulerBuilder {
-    /// Create a new [`SchedulerBuilder`] with the provided number of threads
-    pub fn new(num_threads: usize) -> Self {
-        let builder = ThreadPoolBuilder::new()
-            .num_threads(num_threads)
-            .panic_handler(|p| error!("{}", format_worker_panic(p)))
-            .thread_name(|idx| format!("df-worker-{idx}"));
-
-        Self { inner: builder }
-    }
-
-    /// Registers a custom panic handler
-    #[cfg(test)]
-    fn panic_handler<H>(self, panic_handler: H) -> Self
-    where
-        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
-    {
-        Self {
-            inner: self.inner.panic_handler(panic_handler),
-        }
-    }
-
-    /// Build a new [`Scheduler`]
-    fn build(self) -> Scheduler {
-        Scheduler {
-            pool: Arc::new(self.inner.build().unwrap()),
-        }
-    }
-}
-
-/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool
-pub struct Scheduler {
-    pool: Arc<ThreadPool>,
-}
-
-impl Scheduler {
-    /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool
-    pub fn new(num_threads: usize) -> Self {
-        SchedulerBuilder::new(num_threads).build()
-    }
-
-    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
-    ///
-    /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced,
-    /// as a [`futures::Stream`] of [`RecordBatch`]
-    ///
-    /// [`RecordBatch`]: arrow::record_batch::RecordBatch
-    pub fn schedule(
-        &self,
-        plan: Arc<dyn ExecutionPlan>,
-        context: Arc<TaskContext>,
-    ) -> Result<ExecutionResults> {
-        let plan = PipelinePlanner::new(plan, context).build()?;
-        Ok(self.schedule_plan(plan))
-    }
-
-    /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
-    pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults {
-        spawn_plan(plan, self.spawner())
-    }
-
-    fn spawner(&self) -> Spawner {
-        Spawner {
-            pool: self.pool.clone(),
-        }
-    }
-}
-
-/// Formats a panic message for a worker
-fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
-    let maybe_idx = rayon::current_thread_index();
-    let worker: &dyn std::fmt::Display = match &maybe_idx {
-        Some(idx) => idx,
-        None => &"UNKNOWN",
-    };
-
-    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
-        *msg
-    } else if let Some(msg) = panic.downcast_ref::<String>() {
-        msg.as_str()
-    } else {
-        "UNKNOWN"
-    };
-
-    format!("worker {worker} panicked with: {message}")
-}
-
-/// Returns `true` if the current thread is a rayon worker thread
-///
-/// Note: if there are multiple rayon pools, this will return `true` if the current thread
-/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance
-fn is_worker() -> bool {
-    rayon::current_thread_index().is_some()
-}
-
-/// Spawn a [`Task`] onto the local workers thread pool
-///
-/// There is no guaranteed order of execution, as workers may steal at any time. However,
-/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from
-/// the front of their queue, and steal tasks from the back of other workers queues
-///
-/// The effect is that tasks spawned using `spawn_local` will typically be prioritised in
-/// a LIFO order, however, this should not be relied upon
-fn spawn_local(task: Task) {
-    // Verify is a worker thread to avoid creating a global pool
-    assert!(is_worker(), "must be called from a worker");
-    rayon::spawn(|| task.do_work())
-}
-
-/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
-///
-/// There is no guaranteed order of execution, as workers may steal at any time. However,
-/// `spawn_local_fifo` will append to the back of the current worker's queue, workers pop tasks
-/// from the front of their queue, and steal tasks from the back of other workers queues
-///
-/// The effect is that tasks spawned using `spawn_local_fifo` will typically be prioritised
-/// in a FIFO order, however, this should not be relied upon
-fn spawn_local_fifo(task: Task) {
-    // Verify is a worker thread to avoid creating a global pool
-    assert!(is_worker(), "must be called from a worker");
-    rayon::spawn_fifo(|| task.do_work())
-}
-
-#[derive(Debug, Clone)]
-pub(crate) struct Spawner {
-    pool: Arc<ThreadPool>,
-}
-
-impl Spawner {
-    fn spawn(&self, task: Task) {
-        debug!("Spawning {:?} to any worker", task);
-        self.pool.spawn(move || task.do_work());
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use arrow::util::pretty::pretty_format_batches;
-    use std::ops::Range;
-    use std::panic::panic_any;
-
-    use futures::{StreamExt, TryStreamExt};
-    use log::info;
-    use rand::distributions::uniform::SampleUniform;
-    use rand::{thread_rng, Rng};
-
-    use crate::arrow::array::{ArrayRef, PrimitiveArray};
-    use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
-    use crate::arrow::record_batch::RecordBatch;
-    use crate::datasource::{MemTable, TableProvider};
-    use crate::physical_plan::displayable;
-    use crate::prelude::{SessionConfig, SessionContext};
-
-    use super::*;
-
-    fn generate_primitive<T, R>(
-        rng: &mut R,
-        len: usize,
-        valid_percent: f64,
-        range: Range<T::Native>,
-    ) -> ArrayRef
-    where
-        T: ArrowPrimitiveType,
-        T::Native: SampleUniform,
-        R: Rng,
-    {
-        Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
-            rng.gen_bool(valid_percent)
-                .then(|| rng.gen_range(range.clone()))
-        })))
-    }
-
-    fn generate_batch<R: Rng>(
-        rng: &mut R,
-        row_count: usize,
-        id_offset: i32,
-    ) -> RecordBatch {
-        let id_range = id_offset..(row_count as i32 + id_offset);
-        let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 0..1000);
-        let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. ..1000.);
-        let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
-
-        RecordBatch::try_from_iter_with_nullable([
-            ("a", a, true),
-            ("b", b, true),
-            ("id", Arc::new(id), false),
-        ])
-        .unwrap()
-    }
-
-    const BATCHES_PER_PARTITION: usize = 20;
-    const ROWS_PER_BATCH: usize = 100;
-    const NUM_PARTITIONS: usize = 2;
-
-    fn make_batches() -> Vec<Vec<RecordBatch>> {
-        let mut rng = thread_rng();
-
-        let mut id_offset = 0;
-
-        (0..NUM_PARTITIONS)
-            .map(|_| {
-                (0..BATCHES_PER_PARTITION)
-                    .map(|_| {
-                        let batch = generate_batch(&mut rng, ROWS_PER_BATCH, id_offset);
-                        id_offset += ROWS_PER_BATCH as i32;
-                        batch
-                    })
-                    .collect()
-            })
-            .collect()
-    }
-
-    fn make_provider() -> Arc<dyn TableProvider> {
-        let batches = make_batches();
-        let schema = batches.first().unwrap().first().unwrap().schema();
-        Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
-    }
-
-    fn init_logging() {
-        let _ = env_logger::builder().is_test(true).try_init();
-    }
-
-    #[tokio::test]
-    async fn test_simple() {
-        init_logging();
-
-        let scheduler = SchedulerBuilder::new(4)
-            .panic_handler(|panic| {
-                unreachable!("not expect panic: {:?}", panic);
-            })
-            .build();
-
-        let config = SessionConfig::new().with_target_partitions(4);
-        let context = SessionContext::with_config(config);
-
-        context.register_table("table1", make_provider()).unwrap();
-        context.register_table("table2", make_provider()).unwrap();
-
-        let queries = [
-            "select * from table1 order by id",
-            "select * from table1 where table1.a > 100 order by id",
-            "select distinct a from table1 where table1.b > 100 order by a",
-            "select * from table1 join table2 on table1.id = table2.id order by table1.id",
-            "select id from table1 union all select id from table2 order by id",
-            "select id from table1 union all select id from table2 where a > 100 order by id",
-            "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
-            "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id",
-            "select count(*) from table1 where table1.a > 4",
-            "WITH gp AS (SELECT id FROM table1 GROUP BY id)
-            SELECT COUNT(CAST(CAST(gp.id || 'xx' AS TIMESTAMP) AS BIGINT)) FROM gp",
-        ];
-
-        for sql in queries {
-            let task = context.task_ctx();
-
-            let query = context.sql(sql).await.unwrap();
-
-            let plan = query.clone().create_physical_plan().await.unwrap();
-
-            info!("Plan: {}", displayable(plan.as_ref()).indent());
-
-            let stream = scheduler.schedule(plan, task).unwrap().stream();
-            let scheduled: Vec<_> = stream.try_collect().await.unwrap_or_default();
-            let expected = query.collect().await.unwrap_or_default();
-
-            let total_expected = expected.iter().map(|x| x.num_rows()).sum::<usize>();
-            let total_scheduled = scheduled.iter().map(|x| x.num_rows()).sum::<usize>();
-            assert_eq!(total_expected, total_scheduled);
-
-            info!("Query \"{}\" produced {} rows", sql, total_expected);
-
-            let expected = pretty_format_batches(&expected).unwrap().to_string();
-            let scheduled = pretty_format_batches(&scheduled).unwrap().to_string();
-
-            assert_eq!(
-                expected, scheduled,
-                "\n\nexpected:\n\n{expected}\nactual:\n\n{scheduled}\n\n"
-            );
-        }
-    }
-
-    #[tokio::test]
-    async fn test_partitioned() {
-        init_logging();
-
-        let scheduler = Scheduler::new(4);
-
-        let config = SessionConfig::new().with_target_partitions(4);
-        let context = SessionContext::with_config(config);
-        let plan = context
-            .read_table(make_provider())
-            .unwrap()
-            .create_physical_plan()
-            .await
-            .unwrap();
-
-        assert_eq!(plan.output_partitioning().partition_count(), NUM_PARTITIONS);
-
-        let results = scheduler
-            .schedule(plan.clone(), context.task_ctx())
-            .unwrap();
-
-        let batches = results.stream().try_collect::<Vec<_>>().await.unwrap();
-        assert_eq!(batches.len(), NUM_PARTITIONS * BATCHES_PER_PARTITION);
-
-        for batch in batches {
-            assert_eq!(batch.num_rows(), ROWS_PER_BATCH)
-        }
-
-        let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
-        let streams = results.stream_partitioned();
-
-        let partitions: Vec<Vec<_>> =
-            futures::future::try_join_all(streams.into_iter().map(|s| s.try_collect()))
-                .await
-                .unwrap();
-
-        assert_eq!(partitions.len(), NUM_PARTITIONS);
-        for batches in partitions {
-            assert_eq!(batches.len(), BATCHES_PER_PARTITION);
-            for batch in batches {
-                assert_eq!(batch.num_rows(), ROWS_PER_BATCH);
-            }
-        }
-    }
-
-    #[tokio::test]
-    async fn test_panic() {
-        init_logging();
-
-        let do_test = |scheduler: Scheduler| {
-            scheduler.pool.spawn(|| panic!("test"));
-            scheduler.pool.spawn(|| panic!("{}", 1));
-            scheduler.pool.spawn(|| panic_any(21));
-        };
-
-        // The default panic handler should log panics and not abort the process
-        do_test(Scheduler::new(1));
-
-        // Override panic handler and capture panics to test formatting
-        let (sender, receiver) = futures::channel::mpsc::unbounded();
-        let scheduler = SchedulerBuilder::new(1)
-            .panic_handler(move |panic| {
-                let _ = sender.unbounded_send(format_worker_panic(panic));
-            })
-            .build();
-
-        do_test(scheduler);
-
-        // Sort as order not guaranteed
-        let mut buffer: Vec<_> = receiver.collect().await;
-        buffer.sort_unstable();
-
-        assert_eq!(buffer.len(), 3);
-        assert_eq!(buffer[0], "worker 0 panicked with: 1");
-        assert_eq!(buffer[1], "worker 0 panicked with: UNKNOWN");
-        assert_eq!(buffer[2], "worker 0 panicked with: test");
-    }
-}
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
deleted file mode 100644
index ea1643867b..0000000000
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ /dev/null
@@ -1,307 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-use std::collections::VecDeque;
-use std::fmt::Formatter;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll, Waker};
-
-use futures::{Stream, StreamExt};
-use parking_lot::Mutex;
-
-use crate::arrow::datatypes::SchemaRef;
-use crate::arrow::record_batch::RecordBatch;
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::metrics::MetricsSet;
-use crate::physical_plan::{
-    displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
-    RecordBatchStream, SendableRecordBatchStream, Statistics,
-};
-
-use crate::scheduler::pipeline::Pipeline;
-
-/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
-/// converts it to the push-based [`Pipeline`] interface
-///
-/// Internally [`ExecutionPipeline`] is still pull-based which limits its parallelism
-/// to that of its output partitioning, however, it provides full compatibility with
-/// [`ExecutionPlan`] allowing full interoperability with the existing ecosystem
-///
-/// Longer term we will likely want to introduce new traits that differentiate between
-/// pipeline-able operators like filters, and pipeline-breakers like aggregations, and
-/// are better aligned with a push-based execution model.
-///
-/// This in turn will allow for [`Pipeline`] implementations that are able to introduce
-/// parallelism beyond that expressed in their partitioning
-pub struct ExecutionPipeline {
-    proxied: Arc<dyn ExecutionPlan>,
-    inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
-    outputs: Vec<Mutex<SendableRecordBatchStream>>,
-}
-
-impl std::fmt::Debug for ExecutionPipeline {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let tree = debug_tree(self.proxied.as_ref());
-        f.debug_tuple("ExecutionNode").field(&tree).finish()
-    }
-}
-
-impl ExecutionPipeline {
-    pub fn new(
-        plan: Arc<dyn ExecutionPlan>,
-        task_context: Arc<TaskContext>,
-        depth: usize,
-    ) -> Result<Self> {
-        // The point in the plan at which to splice the plan graph
-        let mut splice_point = plan;
-        let mut parent_plans = Vec::with_capacity(depth.saturating_sub(1));
-        for _ in 0..depth {
-            let children = splice_point.children();
-            assert_eq!(
-                children.len(),
-                1,
-                "can only group through nodes with a single child"
-            );
-            parent_plans.push(splice_point);
-            splice_point = children.into_iter().next().unwrap();
-        }
-
-        // The children to replace with [`ProxyExecutionPlan`]
-        let children = splice_point.children();
-        let mut inputs = Vec::with_capacity(children.len());
-
-        // The spliced plan with its children replaced with [`ProxyExecutionPlan`]
-        let spliced = if !children.is_empty() {
-            let mut proxies: Vec<Arc<dyn ExecutionPlan>> =
-                Vec::with_capacity(children.len());
-
-            for child in children {
-                let count = child.output_partitioning().partition_count();
-
-                let mut child_inputs = Vec::with_capacity(count);
-                for _ in 0..count {
-                    child_inputs.push(Default::default())
-                }
-
-                inputs.push(child_inputs.clone());
-                proxies.push(Arc::new(ProxyExecutionPlan {
-                    inner: child,
-                    inputs: child_inputs,
-                }));
-            }
-
-            splice_point.with_new_children(proxies)?
-        } else {
-            splice_point.clone()
-        };
-
-        // Reconstruct the parent graph
-        let mut proxied = spliced;
-        for parent in parent_plans.into_iter().rev() {
-            proxied = parent.with_new_children(vec![proxied])?
-        }
-
-        // Construct the output streams
-        let output_count = proxied.output_partitioning().partition_count();
-        let outputs = (0..output_count)
-            .map(|x| proxied.execute(x, task_context.clone()).map(Mutex::new))
-            .collect::<Result<_>>()?;
-
-        Ok(Self {
-            proxied,
-            inputs,
-            outputs,
-        })
-    }
-}
-
-impl Pipeline for ExecutionPipeline {
-    /// Push a [`RecordBatch`] to the given input partition
-    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> {
-        let mut partition = self.inputs[child][partition].lock();
-        assert!(!partition.is_closed);
-
-        partition.buffer.push_back(input);
-        for waker in partition.wait_list.drain(..) {
-            waker.wake()
-        }
-        Ok(())
-    }
-
-    fn close(&self, child: usize, partition: usize) {
-        let mut partition = self.inputs[child][partition].lock();
-        assert!(!partition.is_closed);
-
-        partition.is_closed = true;
-        for waker in partition.wait_list.drain(..) {
-            waker.wake()
-        }
-    }
-
-    fn output_partitions(&self) -> usize {
-        self.outputs.len()
-    }
-
-    /// Poll an output partition, attempting to get its output
-    fn poll_partition(
-        &self,
-        cx: &mut Context<'_>,
-        partition: usize,
-    ) -> Poll<Option<Result<RecordBatch>>> {
-        self.outputs[partition]
-            .lock()
-            .poll_next_unpin(cx)
-            .map(|opt| opt.map(|r| r.map_err(Into::into)))
-    }
-}
-
-#[derive(Debug, Default)]
-struct InputPartition {
-    buffer: VecDeque<RecordBatch>,
-    wait_list: Vec<Waker>,
-    is_closed: bool,
-}
-
-struct InputPartitionStream {
-    schema: SchemaRef,
-    partition: Arc<Mutex<InputPartition>>,
-}
-
-impl Stream for InputPartitionStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        let mut partition = self.partition.lock();
-        match partition.buffer.pop_front() {
-            Some(batch) => Poll::Ready(Some(Ok(batch))),
-            None if partition.is_closed => Poll::Ready(None),
-            _ => {
-                partition.wait_list.push(cx.waker().clone());
-                Poll::Pending
-            }
-        }
-    }
-}
-
-impl RecordBatchStream for InputPartitionStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-/// This is a hack that allows injecting [`InputPartitionStream`] in place of the
-/// streams yielded by the child of the wrapped [`ExecutionPlan`]
-///
-/// This is hopefully temporary pending reworking [`ExecutionPlan`]
-#[derive(Debug)]
-struct ProxyExecutionPlan {
-    inner: Arc<dyn ExecutionPlan>,
-
-    inputs: Vec<Arc<Mutex<InputPartition>>>,
-}
-
-impl ExecutionPlan for ProxyExecutionPlan {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        self.inner.schema()
-    }
-
-    fn output_partitioning(&self) -> Partitioning {
-        self.inner.output_partitioning()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.inner.output_ordering()
-    }
-
-    fn required_input_distribution(&self) -> Vec<Distribution> {
-        self.inner.required_input_distribution()
-    }
-
-    fn maintains_input_order(&self) -> Vec<bool> {
-        self.inner.maintains_input_order()
-    }
-
-    fn benefits_from_input_partitioning(&self) -> bool {
-        self.inner.benefits_from_input_partitioning()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        _children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        unimplemented!()
-    }
-
-    fn execute(
-        &self,
-        partition: usize,
-        _context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(InputPartitionStream {
-            schema: self.schema(),
-            partition: self.inputs[partition].clone(),
-        }))
-    }
-
-    fn metrics(&self) -> Option<MetricsSet> {
-        self.inner.metrics()
-    }
-
-    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
-        write!(f, "ProxyExecutionPlan")
-    }
-
-    fn statistics(&self) -> Statistics {
-        self.inner.statistics()
-    }
-}
-
-struct NodeDescriptor {
-    operator: String,
-    children: Vec<NodeDescriptor>,
-}
-
-impl std::fmt::Debug for NodeDescriptor {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct(&self.operator)
-            .field("children", &self.children)
-            .finish()
-    }
-}
-
-fn debug_tree(plan: &dyn ExecutionPlan) -> NodeDescriptor {
-    let operator = format!("{}", displayable(plan).one_line());
-    let children = plan
-        .children()
-        .into_iter()
-        .map(|x| debug_tree(x.as_ref()))
-        .collect();
-
-    NodeDescriptor { operator, children }
-}
diff --git a/datafusion/core/src/scheduler/pipeline/mod.rs b/datafusion/core/src/scheduler/pipeline/mod.rs
deleted file mode 100644
index c1838cd1a4..0000000000
--- a/datafusion/core/src/scheduler/pipeline/mod.rs
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::task::{Context, Poll};
-
-use arrow::record_batch::RecordBatch;
-
-use crate::error::Result;
-
-pub mod execution;
-pub mod repartition;
-
-/// A push-based interface used by the scheduler to drive query execution
-///
-/// A pipeline processes data from one or more input partitions, producing output
-/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
-/// more than one upstream [`Pipeline`], input partitions are identified by both
-/// a child index, and a partition index, whereas output partitions are only
-/// identified by a partition index.
-///
-/// This is not intended as an eventual replacement for the physical plan representation
-/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
-/// parts of the physical plan are "compiled" into by the scheduler.
-///
-/// # Eager vs Lazy Execution
-///
-/// Whether computation is eagerly done on push, or lazily done on pull, is
-/// intentionally left as an implementation detail of the [`Pipeline`]
-///
-/// This allows flexibility to support the following different patterns, and potentially more:
-///
-/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`]
-/// and immediately wakes the corresponding output partition.
-///
-/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon
-/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when
-/// the job completes. Order and non-order preserving variants are possible
-///
-/// A merge pipeline which combines data from one or more input partitions into one or
-/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes
-/// any output partitions that may now be able to make progress. This may be none if
-/// the operator is waiting on data from a different input partition
-///
-/// An aggregation pipeline which combines data from one or more input partitions into
-/// a single output partition. [`Pipeline::push`] would eagerly update the computed
-/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output.
-/// It would also be possible to flush once the partial aggregates reach a certain size
-///
-/// A partition-aware aggregation pipeline, which functions similarly to the above, but
-/// computes aggregations per input partition, before combining these prior to flush.
-///
-/// An async input pipeline, which has no inputs, and wakes the output partition
-/// whenever new data is available
-///
-/// A JIT compiled sequence of synchronous operators, that perform multiple operations
-/// from the physical plan as a single [`Pipeline`]. Parallelized implementations
-/// are also possible
-///
-/// [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
-pub trait Pipeline: Send + Sync + std::fmt::Debug {
-    /// Push a [`RecordBatch`] to the given input partition
-    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>;
-
-    /// Mark an input partition as exhausted
-    fn close(&self, child: usize, partition: usize);
-
-    /// Returns the number of output partitions
-    fn output_partitions(&self) -> usize;
-
-    /// Attempt to pull out the next value of the given output partition, registering the
-    /// current task for wakeup if the value is not yet available, and returning `None`
-    /// if the output partition is exhausted and will never yield any further values
-    ///
-    /// # Return value
-    ///
-    /// There are several possible return values:
-    ///
-    /// - `Poll::Pending` indicates that this partition's next value is not ready yet.
-    /// Implementations should use the waker provided by `cx` to notify the scheduler when
-    /// progress may be able to be made
-    ///
-    /// - `Poll::Ready(Some(Ok(val)))` returns the next value from this output partition,
-    /// the output partition should be polled again as it may have further values. The returned
-    /// value will be routed to the next pipeline in the query
-    ///
-    /// - `Poll::Ready(Some(Err(e)))` returns an error that will be routed to the query's output
-    /// and the query execution aborted.
-    ///
-    /// - `Poll::Ready(None)` indicates that this partition is exhausted and will not produce any
-    /// further values.
-    ///
-    fn poll_partition(
-        &self,
-        cx: &mut Context<'_>,
-        partition: usize,
-    ) -> Poll<Option<Result<RecordBatch>>>;
-}
diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs b/datafusion/core/src/scheduler/pipeline/repartition.rs
deleted file mode 100644
index 7eeb3c31de..0000000000
--- a/datafusion/core/src/scheduler/pipeline/repartition.rs
+++ /dev/null
@@ -1,155 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::VecDeque;
-use std::task::{Context, Poll, Waker};
-
-use parking_lot::Mutex;
-
-use crate::arrow::record_batch::RecordBatch;
-use crate::error::Result;
-use crate::physical_plan::repartition::BatchPartitioner;
-use crate::physical_plan::Partitioning;
-
-use crate::scheduler::pipeline::Pipeline;
-
-/// A [`Pipeline`] that can repartition its input
-#[derive(Debug)]
-pub struct RepartitionPipeline {
-    output_count: usize,
-    state: Mutex<RepartitionState>,
-}
-
-impl RepartitionPipeline {
-    /// Create a new [`RepartitionPipeline`] with the given `input` and `output` partitioning
-    pub fn try_new(input: Partitioning, output: Partitioning) -> Result<Self> {
-        let input_count = input.partition_count();
-        let output_count = output.partition_count();
-        assert_ne!(input_count, 0);
-        assert_ne!(output_count, 0);
-
-        // TODO: metrics support
-        let partitioner = BatchPartitioner::try_new(output, Default::default())?;
-
-        let state = Mutex::new(RepartitionState {
-            partitioner,
-            partition_closed: vec![false; input_count],
-            input_closed: false,
-            output_buffers: (0..output_count).map(|_| Default::default()).collect(),
-        });
-
-        Ok(Self {
-            state,
-            output_count,
-        })
-    }
-}
-
-struct RepartitionState {
-    partitioner: BatchPartitioner,
-    partition_closed: Vec<bool>,
-    input_closed: bool,
-    output_buffers: Vec<OutputBuffer>,
-}
-
-impl std::fmt::Debug for RepartitionState {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("RepartitionState")
-            .field("partition_closed", &self.partition_closed)
-            .field("input_closed", &self.input_closed)
-            .finish()
-    }
-}
-
-impl Pipeline for RepartitionPipeline {
-    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> {
-        assert_eq!(child, 0);
-
-        let mut state = self.state.lock();
-        assert!(
-            !state.partition_closed[partition],
-            "attempt to push to closed partition {partition} of RepartitionPipeline({state:?})"
-        );
-
-        let state = &mut *state;
-        state.partitioner.partition(input, |partition, batch| {
-            state.output_buffers[partition].push_batch(batch);
-            Ok(())
-        })
-    }
-
-    fn close(&self, child: usize, partition: usize) {
-        assert_eq!(child, 0);
-
-        let mut state = self.state.lock();
-        assert!(
-            !state.partition_closed[partition],
-            "attempt to close already closed partition {partition} of RepartitionPipeline({state:?})"
-        );
-
-        state.partition_closed[partition] = true;
-
-        // If all input streams exhausted, wake outputs
-        if state.partition_closed.iter().all(|x| *x) {
-            state.input_closed = true;
-            for buffer in &mut state.output_buffers {
-                for waker in buffer.wait_list.drain(..) {
-                    waker.wake()
-                }
-            }
-        }
-    }
-
-    fn output_partitions(&self) -> usize {
-        self.output_count
-    }
-
-    fn poll_partition(
-        &self,
-        cx: &mut Context<'_>,
-        partition: usize,
-    ) -> Poll<Option<Result<RecordBatch>>> {
-        let mut state = self.state.lock();
-        let input_closed = state.input_closed;
-        let buffer = &mut state.output_buffers[partition];
-
-        match buffer.batches.pop_front() {
-            Some(batch) => Poll::Ready(Some(Ok(batch))),
-            None if input_closed => Poll::Ready(None),
-            _ => {
-                buffer.wait_list.push(cx.waker().clone());
-                Poll::Pending
-            }
-        }
-    }
-}
-
-#[derive(Debug, Default)]
-struct OutputBuffer {
-    batches: VecDeque<RecordBatch>,
-    wait_list: Vec<Waker>,
-}
-
-impl OutputBuffer {
-    fn push_batch(&mut self, batch: RecordBatch) {
-        self.batches.push_back(batch);
-
-        for waker in self.wait_list.drain(..) {
-            waker.wake()
-        }
-    }
-}
diff --git a/datafusion/core/src/scheduler/plan.rs b/datafusion/core/src/scheduler/plan.rs
deleted file mode 100644
index b5a786a322..0000000000
--- a/datafusion/core/src/scheduler/plan.rs
+++ /dev/null
@@ -1,296 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::datatypes::SchemaRef;
-use std::sync::Arc;
-
-use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::{ExecutionPlan, Partitioning};
-
-use crate::scheduler::pipeline::{
-    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
-};
-
-/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct OutputLink {
-    /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to
-    pub pipeline: usize,
-
-    /// The child of the [`Pipeline`] to route output to
-    pub child: usize,
-}
-
-/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
-#[derive(Debug)]
-pub struct RoutablePipeline {
-    /// The pipeline that produces data
-    pub pipeline: Box<dyn Pipeline>,
-
-    /// Where to send output the output of `pipeline`
-    ///
-    /// If `None`, the output should be sent to the query output
-    pub output: Option<OutputLink>,
-}
-
-/// [`PipelinePlan`] is the scheduler's representation of the [`ExecutionPlan`] passed to
-/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
-/// necessary to route output from one stage to the next
-#[derive(Debug)]
-pub struct PipelinePlan {
-    /// Schema of this plans output
-    pub schema: SchemaRef,
-
-    /// Number of output partitions
-    pub output_partitions: usize,
-
-    /// Pipelines that comprise this plan
-    pub pipelines: Vec<RoutablePipeline>,
-}
-
-/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
-/// together multiple operators, [`OperatorGroup`] stores this state
-struct OperatorGroup {
-    /// Where to route the output of the eventual [`Pipeline`]
-    output: Option<OutputLink>,
-
-    /// The [`ExecutionPlan`] from which to start recursing
-    root: Arc<dyn ExecutionPlan>,
-
-    /// The number of times to recurse into the [`ExecutionPlan`]'s children
-    depth: usize,
-}
-
-/// A utility struct to assist converting from [`ExecutionPlan`] to [`PipelinePlan`]
-///
-/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually building
-/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited depth-first,
-/// a node is visited only after its parent has been.
-pub struct PipelinePlanner {
-    task_context: Arc<TaskContext>,
-
-    /// The schema of this plan
-    schema: SchemaRef,
-
-    /// The number of output partitions of this plan
-    output_partitions: usize,
-
-    /// The current list of completed pipelines
-    completed: Vec<RoutablePipeline>,
-
-    /// A list of [`ExecutionPlan`] still to visit, along with
-    /// where they should route their output
-    to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
-
-    /// Stores one or more operators to combine
-    /// together into a single [`ExecutionPipeline`]
-    execution_operators: Option<OperatorGroup>,
-}
-
-impl PipelinePlanner {
-    pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) -> Self {
-        let schema = plan.schema();
-        let output_partitions = plan.output_partitioning().partition_count();
-        Self {
-            completed: vec![],
-            to_visit: vec![(plan, None)],
-            task_context,
-            execution_operators: None,
-            schema,
-            output_partitions,
-        }
-    }
-
-    /// Flush the current group of operators stored in `execution_operators`
-    /// into a single [`ExecutionPipeline]
-    fn flush_exec(&mut self) -> Result<usize> {
-        let group = self.execution_operators.take().unwrap();
-        let node_idx = self.completed.len();
-        self.completed.push(RoutablePipeline {
-            pipeline: Box::new(ExecutionPipeline::new(
-                group.root,
-                self.task_context.clone(),
-                group.depth,
-            )?),
-            output: group.output,
-        });
-        Ok(node_idx)
-    }
-
-    /// Visit a non-special cased [`ExecutionPlan`]
-    fn visit_exec(
-        &mut self,
-        plan: Arc<dyn ExecutionPlan>,
-        parent: Option<OutputLink>,
-    ) -> Result<()> {
-        let children = plan.children();
-
-        // Add the operator to the current group of operators to be combined
-        // into a single [`ExecutionPipeline`].
-        //
-        // TODO: More sophisticated policy, just because we can combine them doesn't mean we should
-        match self.execution_operators.as_mut() {
-            Some(buffer) => {
-                assert_eq!(parent, buffer.output, "QueryBuilder out of sync");
-                buffer.depth += 1;
-            }
-            None => {
-                self.execution_operators = Some(OperatorGroup {
-                    output: parent,
-                    root: plan,
-                    depth: 0,
-                })
-            }
-        }
-
-        match children.len() {
-            1 => {
-                // Enqueue the children with the parent of the `OperatorGroup`
-                self.to_visit
-                    .push((children.into_iter().next().unwrap(), parent))
-            }
-            _ => {
-                // We can only recursively group through nodes with a single child, therefore
-                // if this node has multiple children, we now need to flush the buffer and
-                // enqueue its children with this new pipeline as its parent
-                let node = self.flush_exec()?;
-                self.enqueue_children(children, node);
-            }
-        }
-
-        Ok(())
-    }
-
-    /// Add the given list of children to the stack of [`ExecutionPlan`] to visit
-    fn enqueue_children(
-        &mut self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-        parent_node_idx: usize,
-    ) {
-        for (child_idx, child) in children.into_iter().enumerate() {
-            self.to_visit.push((
-                child,
-                Some(OutputLink {
-                    pipeline: parent_node_idx,
-                    child: child_idx,
-                }),
-            ))
-        }
-    }
-
-    /// Push a new [`RoutablePipeline`] and enqueue its children to be visited
-    fn push_pipeline(
-        &mut self,
-        node: RoutablePipeline,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) {
-        let node_idx = self.completed.len();
-        self.completed.push(node);
-        self.enqueue_children(children, node_idx)
-    }
-
-    /// Push a new [`RepartitionPipeline`] first flushing any buffered [`OperatorGroup`]
-    fn push_repartition(
-        &mut self,
-        input: Partitioning,
-        output: Partitioning,
-        parent: Option<OutputLink>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<()> {
-        let parent = match &self.execution_operators {
-            Some(buffer) => {
-                assert_eq!(buffer.output, parent, "QueryBuilder out of sync");
-                Some(OutputLink {
-                    pipeline: self.flush_exec()?,
-                    child: 0, // Must be the only child
-                })
-            }
-            None => parent,
-        };
-
-        let node = Box::new(RepartitionPipeline::try_new(input, output)?);
-        self.push_pipeline(
-            RoutablePipeline {
-                pipeline: node,
-                output: parent,
-            },
-            children,
-        );
-        Ok(())
-    }
-
-    /// Visit an [`ExecutionPlan`] operator and add it to the [`PipelinePlan`] being built
-    fn visit_operator(
-        &mut self,
-        plan: Arc<dyn ExecutionPlan>,
-        parent: Option<OutputLink>,
-    ) -> Result<()> {
-        if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
-            self.push_repartition(
-                repartition.input().output_partitioning(),
-                repartition.output_partitioning(),
-                parent,
-                repartition.children(),
-            )
-        } else if let Some(coalesce) =
-            plan.as_any().downcast_ref::<CoalescePartitionsExec>()
-        {
-            self.push_repartition(
-                coalesce.input().output_partitioning(),
-                Partitioning::RoundRobinBatch(1),
-                parent,
-                coalesce.children(),
-            )
-        } else {
-            self.visit_exec(plan, parent)
-        }
-    }
-
-    /// Build a [`PipelinePlan`] from the [`ExecutionPlan`] provided to [`PipelinePlanner::new`]
-    ///
-    /// This will group all operators possible into a single [`ExecutionPipeline`], only
-    /// creating new pipelines when:
-    ///
-    /// - encountering an operator with multiple children
-    /// - encountering a repartitioning operator
-    ///
-    /// This latter case is because currently the repartitioning operators in DataFusion are
-    /// coupled with the non-scheduler-based parallelism story
-    ///
-    /// The above logic is liable to change, is considered an implementation detail of the
-    /// scheduler, and should not be relied upon by operators
-    ///
-    pub fn build(mut self) -> Result<PipelinePlan> {
-        // We do a depth-first scan of the operator tree, extracting a list of [`QueryNode`]
-        while let Some((plan, parent)) = self.to_visit.pop() {
-            self.visit_operator(plan, parent)?;
-        }
-
-        if self.execution_operators.is_some() {
-            self.flush_exec()?;
-        }
-
-        Ok(PipelinePlan {
-            schema: self.schema,
-            output_partitions: self.output_partitions,
-            pipelines: self.completed,
-        })
-    }
-}
diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs
deleted file mode 100644
index bf10186ef2..0000000000
--- a/datafusion/core/src/scheduler/task.rs
+++ /dev/null
@@ -1,509 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::{DataFusionError, Result};
-use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use crate::scheduler::{
-    is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, RoutablePipeline,
-    Spawner,
-};
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use futures::channel::mpsc;
-use futures::task::ArcWake;
-use futures::{ready, Stream, StreamExt};
-use log::{debug, trace};
-use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Weak};
-use std::task::{Context, Poll};
-
-/// Spawns a [`PipelinePlan`] using the provided [`Spawner`]
-pub(crate) fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults {
-    debug!("Spawning pipeline plan: {:#?}", plan);
-
-    let (senders, receivers) = (0..plan.output_partitions)
-        .map(|_| mpsc::unbounded())
-        .unzip::<_, _, Vec<_>, Vec<_>>();
-
-    let context = Arc::new(ExecutionContext {
-        spawner,
-        pipelines: plan.pipelines,
-        schema: plan.schema,
-        output: senders,
-    });
-
-    for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() {
-        for partition in 0..query_pipeline.pipeline.output_partitions() {
-            context.spawner.spawn(Task {
-                context: context.clone(),
-                waker: Arc::new(TaskWaker {
-                    context: Arc::downgrade(&context),
-                    wake_count: AtomicUsize::new(1),
-                    pipeline: pipeline_idx,
-                    partition,
-                }),
-            });
-        }
-    }
-
-    let partitions = receivers
-        .into_iter()
-        .map(|receiver| ExecutionResultStream {
-            receiver,
-            context: context.clone(),
-        })
-        .collect();
-
-    ExecutionResults {
-        streams: partitions,
-        context,
-    }
-}
-
-/// A [`Task`] identifies an output partition within a given pipeline that may be able to
-/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding
-/// [`Task`] and distributes them amongst its worker threads.
-pub struct Task {
-    /// Maintain a link to the [`ExecutionContext`] this is necessary to be able to
-    /// route the output of the partition to its destination
-    context: Arc<ExecutionContext>,
-
-    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution
-    waker: Arc<TaskWaker>,
-}
-
-impl std::fmt::Debug for Task {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let output = &self.context.pipelines[self.waker.pipeline].output;
-
-        f.debug_struct("Task")
-            .field("pipeline", &self.waker.pipeline)
-            .field("partition", &self.waker.partition)
-            .field("output", &output)
-            .finish()
-    }
-}
-
-impl Task {
-    fn handle_error(
-        &self,
-        partition: usize,
-        routable: &RoutablePipeline,
-        error: DataFusionError,
-    ) {
-        match routable.output {
-            Some(link) => {
-                // The query output partitioning may not match the current pipeline's
-                // but the query output has at least one partition
-                // so send error to the first partition of the query output.
-                self.context.send_query_output(0, Err(error));
-
-                trace!(
-                    "Closing pipeline: {:?}, partition: {}, due to error",
-                    link,
-                    self.waker.partition,
-                );
-
-                self.context.pipelines[link.pipeline]
-                    .pipeline
-                    .close(link.child, self.waker.partition);
-            }
-            None => self.context.send_query_output(partition, Err(error)),
-        }
-    }
-
-    /// Call [`Pipeline::poll_partition`][super::pipeline::Pipeline::poll_partition],
-    /// attempting to make progress on query execution
-    pub fn do_work(self) {
-        assert!(is_worker(), "Task::do_work called outside of worker pool");
-        if self.context.is_cancelled() {
-            return;
-        }
-
-        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
-        // this allows us to detect concurrent wake ups and handle them correctly
-        let wake_count = self.waker.wake_count.load(Ordering::SeqCst);
-
-        let node = self.waker.pipeline;
-        let partition = self.waker.partition;
-
-        let waker = futures::task::waker_ref(&self.waker);
-        let mut cx = Context::from_waker(&waker);
-
-        let pipelines = &self.context.pipelines;
-        let routable = &pipelines[node];
-        match routable.pipeline.poll_partition(&mut cx, partition) {
-            Poll::Ready(Some(Ok(batch))) => {
-                trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
-                match routable.output {
-                    Some(link) => {
-                        trace!(
-                            "Publishing batch to pipeline {:?} partition {}",
-                            link,
-                            partition
-                        );
-
-                        let r = pipelines[link.pipeline]
-                            .pipeline
-                            .push(batch, link.child, partition);
-
-                        if let Err(e) = r {
-                            self.handle_error(partition, routable, e);
-
-                            // Return without rescheduling this output again
-                            return;
-                        }
-                    }
-                    None => {
-                        trace!("Publishing batch to output");
-                        self.context.send_query_output(partition, Ok(batch))
-                    }
-                }
-
-                // Reschedule this pipeline again
-                //
-                // We want to prioritise running tasks triggered by the most recent
-                // batch, so reschedule with FIFO ordering
-                //
-                // Note: We must schedule after we have routed the batch, otherwise
-                // we introduce a potential ordering race where the newly scheduled
-                // task runs before this task finishes routing the output
-                spawn_local_fifo(self);
-            }
-            Poll::Ready(Some(Err(e))) => {
-                trace!("Poll {:?}: Error: {:?}", self, e);
-                self.handle_error(partition, routable, e)
-            }
-            Poll::Ready(None) => {
-                trace!("Poll {:?}: None", self);
-                match routable.output {
-                    Some(link) => {
-                        trace!("Closing pipeline: {:?}, partition: {}", link, partition);
-                        pipelines[link.pipeline]
-                            .pipeline
-                            .close(link.child, partition)
-                    }
-                    None => self.context.finish(partition),
-                }
-            }
-            Poll::Pending => {
-                trace!("Poll {:?}: Pending", self);
-                // Attempt to reset the wake count with the value obtained prior
-                // to calling [`Pipeline::poll_partition`].
-                //
-                // If this fails it indicates a wakeup was received whilst executing
-                // [`Pipeline::poll_partition`] and we should reschedule the task
-                let reset = self.waker.wake_count.compare_exchange(
-                    wake_count,
-                    0,
-                    Ordering::SeqCst,
-                    Ordering::SeqCst,
-                );
-
-                if reset.is_err() {
-                    trace!("Wakeup triggered whilst polling: {:?}", self);
-                    spawn_local(self);
-                }
-            }
-        }
-    }
-}
-
-/// The results of the execution of a query
-pub struct ExecutionResults {
-    /// [`ExecutionResultStream`] for each partition of this query
-    streams: Vec<ExecutionResultStream>,
-
-    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
-    context: Arc<ExecutionContext>,
-}
-
-impl ExecutionResults {
-    /// Returns a [`SendableRecordBatchStream`] of this execution
-    ///
-    /// In the event of multiple output partitions, the output will be interleaved
-    pub fn stream(self) -> SendableRecordBatchStream {
-        let schema = self.context.schema.clone();
-        Box::pin(RecordBatchStreamAdapter::new(
-            schema,
-            futures::stream::select_all(self.streams),
-        ))
-    }
-
-    /// Returns a [`SendableRecordBatchStream`] for each partition of this execution
-    pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> {
-        self.streams.into_iter().map(|s| Box::pin(s) as _).collect()
-    }
-}
-
-/// A result stream for the execution of a query
-struct ExecutionResultStream {
-    receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>,
-
-    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
-    context: Arc<ExecutionContext>,
-}
-
-impl Stream for ExecutionResultStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten();
-        Poll::Ready(opt)
-    }
-}
-
-impl RecordBatchStream for ExecutionResultStream {
-    fn schema(&self) -> SchemaRef {
-        self.context.schema.clone()
-    }
-}
-
-/// The shared state of all [`Task`] created from the same [`PipelinePlan`]
-#[derive(Debug)]
-struct ExecutionContext {
-    /// Spawner for this query
-    spawner: Spawner,
-
-    /// List of pipelines that belong to this query, pipelines are addressed
-    /// based on their index within this list
-    pipelines: Vec<RoutablePipeline>,
-
-    /// Schema of this plans output
-    pub schema: SchemaRef,
-
-    /// The output streams, per partition, for this query's execution
-    output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>,
-}
-
-impl Drop for ExecutionContext {
-    fn drop(&mut self) {
-        debug!("ExecutionContext dropped");
-    }
-}
-
-impl ExecutionContext {
-    /// Returns `true` if this query has been dropped, specifically if the
-    /// stream returned by [`super::Scheduler::schedule`] has been dropped
-    fn is_cancelled(&self) -> bool {
-        self.output.iter().all(|x| x.is_closed())
-    }
-
-    /// Sends `output` to this query's output stream
-    fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) {
-        debug_assert!(
-            self.output.len() > partition,
-            "the specified partition exceeds the total number of output partitions"
-        );
-        let _ = self.output[partition].unbounded_send(Some(output));
-    }
-
-    /// Mark this partition as finished
-    fn finish(&self, partition: usize) {
-        let _ = self.output[partition].unbounded_send(None);
-    }
-}
-
-struct TaskWaker {
-    /// Store a weak reference to the [`ExecutionContext`] to avoid reference cycles if this
-    /// [`Waker`] is stored within a `Pipeline` owned by the [`ExecutionContext`]
-    ///
-    /// [`Waker`]: std::task::Waker
-    context: Weak<ExecutionContext>,
-
-    /// A counter that stores the number of times this has been awoken
-    ///
-    /// A value > 0, implies the task is either in the ready queue or
-    /// currently being executed
-    ///
-    /// `TaskWaker::wake` always increments the `wake_count`, however, it only
-    /// re-enqueues the [`Task`] if the value prior to increment was 0
-    ///
-    /// This ensures that a given [`Task`] is not enqueued multiple times
-    ///
-    /// We store an integer, as opposed to a boolean, so that wake ups that
-    /// occur during `Pipeline::poll_partition` can be detected and handled
-    /// after it has finished executing
-    wake_count: AtomicUsize,
-
-    /// The index of the pipeline within `query` to poll
-    pipeline: usize,
-
-    /// The partition of the pipeline within `query` to poll
-    partition: usize,
-}
-
-impl ArcWake for TaskWaker {
-    fn wake(self: Arc<Self>) {
-        if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 {
-            trace!("Ignoring duplicate wakeup");
-            return;
-        }
-
-        if let Some(context) = self.context.upgrade() {
-            let task = Task {
-                context,
-                waker: self.clone(),
-            };
-
-            trace!("Wakeup {:?}", task);
-
-            // If called from a worker, spawn to the current worker's
-            // local queue, otherwise reschedule on any worker
-            match is_worker() {
-                true => spawn_local(task),
-                false => task.context.spawner.clone().spawn(task),
-            }
-        } else {
-            trace!("Dropped wakeup");
-        }
-    }
-
-    fn wake_by_ref(s: &Arc<Self>) {
-        ArcWake::wake(s.clone())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::error::Result;
-    use crate::scheduler::{pipeline::Pipeline, plan::RoutablePipeline, Scheduler};
-    use arrow::array::{ArrayRef, Int32Array};
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow::record_batch::RecordBatch;
-    use futures::{channel::oneshot, ready, FutureExt, StreamExt};
-    use parking_lot::Mutex;
-    use std::fmt::Debug;
-    use std::time::Duration;
-
-    /// Tests that waker can be sent to tokio pool
-    #[derive(Debug)]
-    struct TokioPipeline {
-        handle: tokio::runtime::Handle,
-        state: Mutex<State>,
-    }
-
-    #[derive(Debug)]
-    enum State {
-        Init,
-        Wait(oneshot::Receiver<Result<RecordBatch>>),
-        Finished,
-    }
-
-    impl Default for State {
-        fn default() -> Self {
-            Self::Init
-        }
-    }
-
-    impl Pipeline for TokioPipeline {
-        fn push(
-            &self,
-            _input: RecordBatch,
-            _child: usize,
-            _partition: usize,
-        ) -> Result<()> {
-            unreachable!()
-        }
-
-        fn close(&self, _child: usize, _partition: usize) {}
-
-        fn output_partitions(&self) -> usize {
-            1
-        }
-
-        fn poll_partition(
-            &self,
-            cx: &mut Context<'_>,
-            _partition: usize,
-        ) -> Poll<Option<Result<RecordBatch>>> {
-            let mut state = self.state.lock();
-            loop {
-                match &mut *state {
-                    State::Init => {
-                        let (sender, receiver) = oneshot::channel();
-                        self.handle.spawn(async move {
-                            tokio::time::sleep(Duration::from_millis(10)).await;
-                            let array = Int32Array::from_iter_values([1, 2, 3]);
-                            sender.send(
-                                RecordBatch::try_from_iter([(
-                                    "int",
-                                    Arc::new(array) as ArrayRef,
-                                )])
-                                .map_err(DataFusionError::ArrowError),
-                            )
-                        });
-                        *state = State::Wait(receiver)
-                    }
-                    State::Wait(r) => {
-                        let v = ready!(r.poll_unpin(cx)).ok();
-                        *state = State::Finished;
-                        return Poll::Ready(v);
-                    }
-                    State::Finished => return Poll::Ready(None),
-                }
-            }
-        }
-    }
-
-    #[test]
-    fn test_tokio_waker() {
-        let scheduler = Scheduler::new(2);
-
-        // A tokio runtime
-        let runtime = tokio::runtime::Builder::new_current_thread()
-            .enable_time()
-            .build()
-            .unwrap();
-
-        // A pipeline that dispatches to a tokio worker
-        let pipeline = TokioPipeline {
-            handle: runtime.handle().clone(),
-            state: Default::default(),
-        };
-
-        let plan = PipelinePlan {
-            schema: Arc::new(Schema::new(vec![Field::new(
-                "int",
-                DataType::Int32,
-                false,
-            )])),
-            output_partitions: 1,
-            pipelines: vec![RoutablePipeline {
-                pipeline: Box::new(pipeline),
-                output: None,
-            }],
-        };
-
-        let mut receiver = scheduler.schedule_plan(plan).stream();
-
-        runtime.block_on(async move {
-            // Should wait for output
-            let batch = receiver.next().await.unwrap().unwrap();
-            assert_eq!(batch.num_rows(), 3);
-
-            // Next batch should be none
-            assert!(receiver.next().await.is_none());
-        })
-    }
-}