You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/14 20:55:46 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2226: Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199)

alamb commented on code in PR #2226:
URL: https://github.com/apache/arrow-datafusion/pull/2226#discussion_r850764524


##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool

Review Comment:
   ```suggestion
       /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool
   ```



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan

Review Comment:
   ```suggestion
   /// A [`Scheduler`] creates and manages a pool of dedicated worker threads on which
   /// query execution is scheduled. This is based on the idea of [Morsel-Driven Parallelism]
   /// and is designed to decouple execution parallelism from the parallelism expressed in 
   /// the physical plan as `partitions`. 
   ```



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool
+    pub fn new(num_threads: usize) -> Self {
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .thread_name(|idx| format!("df-worker-{}", idx))
+            .build()
+            .unwrap();
+
+        Self {
+            pool: Arc::new(pool),
+        }
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()

Review Comment:
   should it maybe check the thread name as well in case some other part of the system is using rayon??



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool
+    pub fn new(num_threads: usize) -> Self {
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .thread_name(|idx| format!("df-worker-{}", idx))
+            .build()
+            .unwrap();
+
+        Self {
+            pool: Arc::new(pool),
+        }
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering

Review Comment:
   I think it would help here to explain what guarantees that "FIFO" tasks gives -- is it best effort or can code rely on it?



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool
+    pub fn new(num_threads: usize) -> Self {
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .thread_name(|idx| format!("df-worker-{}", idx))
+            .build()
+            .unwrap();
+
+        Self {
+            pool: Arc::new(pool),
+        }
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {
+    pool: Arc<ThreadPool>,
+}
+
+impl Spawner {
+    pub fn spawn(&self, task: Task) {
+        debug!("Spawning {:?} to any worker", task);
+        self.pool.spawn(move || task.do_work());
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::util::pretty::pretty_format_batches;
+    use std::ops::Range;
+
+    use futures::TryStreamExt;
+    use log::info;
+    use rand::distributions::uniform::SampleUniform;
+    use rand::{thread_rng, Rng};
+
+    use datafusion::arrow::array::{ArrayRef, PrimitiveArray};
+    use datafusion::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
+    use datafusion::arrow::record_batch::RecordBatch;
+    use datafusion::datasource::{MemTable, TableProvider};
+    use datafusion::physical_plan::displayable;
+    use datafusion::prelude::{SessionConfig, SessionContext};
+
+    use super::*;
+
+    fn generate_primitive<T, R>(
+        rng: &mut R,
+        len: usize,
+        valid_percent: f64,
+        range: Range<T::Native>,
+    ) -> ArrayRef
+    where
+        T: ArrowPrimitiveType,
+        T::Native: SampleUniform,
+        R: Rng,
+    {
+        Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+            rng.gen_bool(valid_percent)
+                .then(|| rng.gen_range(range.clone()))
+        })))
+    }
+
+    fn generate_batch<R: Rng>(
+        rng: &mut R,
+        row_count: usize,
+        id_offset: i32,
+    ) -> RecordBatch {
+        let id_range = id_offset..(row_count as i32 + id_offset);
+        let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 0..1000);
+        let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. ..1000.);
+        let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
+
+        RecordBatch::try_from_iter_with_nullable([
+            ("a", a, true),
+            ("b", b, true),
+            ("id", Arc::new(id), false),
+        ])
+        .unwrap()
+    }
+
+    fn make_batches() -> Vec<Vec<RecordBatch>> {
+        let mut rng = thread_rng();
+
+        let batches_per_partition = 20;
+        let rows_per_batch = 100;
+        let num_partitions = 2;
+
+        let mut id_offset = 0;
+
+        (0..num_partitions)
+            .map(|_| {
+                (0..batches_per_partition)
+                    .map(|_| {
+                        let batch = generate_batch(&mut rng, rows_per_batch, id_offset);
+                        id_offset += rows_per_batch as i32;
+                        batch
+                    })
+                    .collect()
+            })
+            .collect()
+    }
+
+    fn make_provider() -> Arc<dyn TableProvider> {
+        let batches = make_batches();
+        let schema = batches.first().unwrap().first().unwrap().schema();
+        Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
+    }
+
+    fn init_logging() {
+        let _ = env_logger::builder().is_test(true).try_init();
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        init_logging();
+
+        let scheduler = Scheduler::new(4);
+
+        let config = SessionConfig::new().with_target_partitions(4);
+        let context = SessionContext::with_config(config);
+
+        context.register_table("table1", make_provider()).unwrap();
+        context.register_table("table2", make_provider()).unwrap();
+
+        let queries = [
+            "select * from table1 order by id",
+            "select * from table1 where table1.a > 100 order by id",
+            "select distinct a from table1 where table1.b > 100 order by a",
+            "select * from table1 join table2 on table1.id = table2.id order by table1.id",
+            "select id from table1 union all select id from table2 order by id",
+            "select id from table1 union all select id from table2 where a > 100 order by id",
+            "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
+            "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id",
+            "select count(*) from table1 where table1.a > 4",
+        ];
+
+        for sql in queries {
+            let task = context.task_ctx();
+
+            let query = context.sql(sql).await.unwrap();
+
+            let plan = query.create_physical_plan().await.unwrap();
+
+            info!("Plan: {}", displayable(plan.as_ref()).indent());
+
+            let stream = scheduler.schedule(plan, task).unwrap();

Review Comment:
   This is a neat way of demonstrating "end to end" test of this scheduler



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool
+    pub fn new(num_threads: usize) -> Self {
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .thread_name(|idx| format!("df-worker-{}", idx))
+            .build()
+            .unwrap();
+
+        Self {
+            pool: Arc::new(pool),
+        }
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {

Review Comment:
   Describing the use of this struct would be helpful I think. Maybe explain why the wrapper around `Arc<ThreadPool>` is doing other than adding some indirection?



##########
datafusion/scheduler/src/query.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use log::debug;
+
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+use crate::{ArrowResult, Spawner};
+
+/// Identifies the [`Pipeline`] within the [`Query`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`Query`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,

Review Comment:
   I wonder if somewhere you could add a description (or a diagram -- 🙏 ) explaining how the concepts of `Pipeline` `Query`, `child` and `ExecutionPlan` and `OutputLink` are connected? It wasn't immediately obvious to me



##########
datafusion/scheduler/src/query.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use log::debug;
+
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+use crate::{ArrowResult, Spawner};
+
+/// Identifies the [`Pipeline`] within the [`Query`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`Query`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`Query`] is the scheduler's representation of the [`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct Query {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// The output stream for this query's execution
+    output: mpsc::UnboundedSender<ArrowResult<RecordBatch>>,
+}
+
+impl Drop for Query {
+    fn drop(&mut self) {
+        debug!("Query finished");
+    }
+}
+
+impl Query {
+    /// Creates a new [`Query`] from the provided [`ExecutionPlan`], returning
+    /// an [`mpsc::UnboundedReceiver`] that can be used to receive the results
+    /// of this query's execution
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        task_context: Arc<TaskContext>,
+        spawner: Spawner,
+    ) -> Result<(Query, mpsc::UnboundedReceiver<ArrowResult<RecordBatch>>)> {
+        QueryBuilder::new(plan, task_context).build(spawner)
+    }
+
+    /// Returns a list of this queries [`QueryPipeline`]
+    pub fn pipelines(&self) -> &[RoutablePipeline] {
+        &self.pipelines
+    }
+
+    /// Returns `true` if this query has been dropped, specifically if the
+    /// stream returned by [`super::Scheduler::schedule`] has been dropped
+    pub fn is_cancelled(&self) -> bool {
+        self.output.is_closed()
+    }
+
+    /// Sends `output` to this query's output stream
+    pub fn send_query_output(&self, output: ArrowResult<RecordBatch>) {
+        let _ = self.output.unbounded_send(output);
+    }
+
+    /// Returns the [`Spawner`] associated with this [`Query`]
+    pub fn spawner(&self) -> &Spawner {
+        &self.spawner
+    }
+}
+
+/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
+/// together multiple [`ExecutionPlan`], [`ExecGroup`] stores this state
+struct ExecGroup {

Review Comment:
   I didn't understand the need to add an extra layer of grouping within a pipeline -- I would have expected everything in a pipeline to run together, and if it needed to run separately it would run as a separate pipeline



##########
datafusion/scheduler/src/query.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use log::debug;
+
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+use crate::{ArrowResult, Spawner};
+
+/// Identifies the [`Pipeline`] within the [`Query`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`Query`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`Query`] is the scheduler's representation of the [`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct Query {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// The output stream for this query's execution
+    output: mpsc::UnboundedSender<ArrowResult<RecordBatch>>,

Review Comment:
   using an unbounded stream means there is no backpressure on execution, right? As in the query will run to completion as fast as possible, regardless of how fast the consumer consumes it. 
   
   Is there a reason this isn't a bounded receiver?



##########
datafusion/scheduler/src/query.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use log::debug;
+
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+use crate::{ArrowResult, Spawner};
+
+/// Identifies the [`Pipeline`] within the [`Query`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`Query`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`Query`] is the scheduler's representation of the [`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct Query {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// The output stream for this query's execution
+    output: mpsc::UnboundedSender<ArrowResult<RecordBatch>>,
+}
+
+impl Drop for Query {
+    fn drop(&mut self) {
+        debug!("Query finished");
+    }
+}
+
+impl Query {
+    /// Creates a new [`Query`] from the provided [`ExecutionPlan`], returning
+    /// an [`mpsc::UnboundedReceiver`] that can be used to receive the results
+    /// of this query's execution
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        task_context: Arc<TaskContext>,
+        spawner: Spawner,
+    ) -> Result<(Query, mpsc::UnboundedReceiver<ArrowResult<RecordBatch>>)> {
+        QueryBuilder::new(plan, task_context).build(spawner)
+    }
+
+    /// Returns a list of this queries [`QueryPipeline`]
+    pub fn pipelines(&self) -> &[RoutablePipeline] {
+        &self.pipelines
+    }
+
+    /// Returns `true` if this query has been dropped, specifically if the
+    /// stream returned by [`super::Scheduler::schedule`] has been dropped
+    pub fn is_cancelled(&self) -> bool {
+        self.output.is_closed()
+    }
+
+    /// Sends `output` to this query's output stream
+    pub fn send_query_output(&self, output: ArrowResult<RecordBatch>) {
+        let _ = self.output.unbounded_send(output);
+    }
+
+    /// Returns the [`Spawner`] associated with this [`Query`]
+    pub fn spawner(&self) -> &Spawner {
+        &self.spawner
+    }
+}
+
+/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
+/// together multiple [`ExecutionPlan`], [`ExecGroup`] stores this state
+struct ExecGroup {
+    /// Where to route the output of the eventual [`Pipeline`]
+    output: Option<OutputLink>,
+
+    /// The [`ExecutionPlan`] from which to start recursing
+    root: Arc<dyn ExecutionPlan>,
+
+    /// The number of times to recurse into the [`ExecutionPlan`]'s children
+    depth: usize,
+}
+
+/// A utility struct to assist converting from [`ExecutionPlan`] to [`Query`]
+///
+/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually building
+/// up the [`RoutablePipeline`] for the [`Query`]. As nodes are visited depth-first,
+/// a node is visited only after its parent has been.
+struct QueryBuilder {
+    task_context: Arc<TaskContext>,
+    /// The current list of completed pipelines
+    in_progress: Vec<RoutablePipeline>,
+
+    /// A list of [`ExecutionPlan`] still to visit, along with
+    /// where they should route their output
+    to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
+
+    /// Stores one or more [`ExecutionPlan`] to combine together into
+    /// a single [`ExecutionPipeline`]
+    exec_buffer: Option<ExecGroup>,
+}
+
+impl QueryBuilder {
+    fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) -> Self {
+        Self {
+            in_progress: vec![],
+            to_visit: vec![(plan, None)],
+            task_context,
+            exec_buffer: None,
+        }
+    }
+
+    /// Flush the current group of [`ExecutionPlan`] stored in `exec_buffer`
+    /// into a single [`ExecutionPipeline]
+    fn flush_exec(&mut self) -> Result<usize> {
+        let group = self.exec_buffer.take().unwrap();
+        let node_idx = self.in_progress.len();
+        self.in_progress.push(RoutablePipeline {
+            pipeline: Box::new(ExecutionPipeline::new(
+                group.root,
+                self.task_context.clone(),
+                group.depth,
+            )?),
+            output: group.output,
+        });
+        Ok(node_idx)
+    }
+
+    /// Visit a non-special cased [`ExecutionPlan`]
+    fn visit_exec(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        parent: Option<OutputLink>,
+    ) -> Result<()> {
+        let children = plan.children();
+
+        // Add the node to the current group of execution plan to be combined
+        // into a single [`ExecutionPipeline`].
+        //
+        // TODO: More sophisticated policy, just because we can combine them doesn't mean we should

Review Comment:
   Can you explain what the tradeoffs are? At first it would seem to me that the larger a pipeline the better (as there is more work that can be done and so the scheduler will have the greatest chance of putting all the threads to use?



##########
datafusion/scheduler/src/task.rs:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{is_worker, spawn_local, spawn_local_fifo, Query};
+use futures::task::ArcWake;
+use log::{debug, trace};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a query using the provided [`Spawner`]
+pub fn spawn_query(query: Arc<Query>) {
+    debug!("Spawning query: {:#?}", query);
+
+    let spawner = query.spawner();
+
+    for (pipeline_idx, query_pipeline) in query.pipelines().iter().enumerate() {
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            spawner.spawn(Task {
+                query: query.clone(),
+                waker: Arc::new(TaskWaker {
+                    query: Arc::downgrade(&query),
+                    wake_count: AtomicUsize::new(1),
+                    pipeline: pipeline_idx,
+                    partition,
+                }),
+            });
+        }
+    }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+///
+/// A [`Query`] is considered completed when it has no outstanding [`Task`]
+pub struct Task {
+    /// Maintain a link to the [`Query`] this is necessary to be able to
+    /// route the output of the partition to its destination, and also because
+    /// when [`Query`] is dropped it signals completion of query execution
+    query: Arc<Query>,
+
+    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution
+    waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let output = self.query.pipelines()[self.waker.pipeline].output;
+
+        f.debug_struct("Task")
+            .field("pipeline", &self.waker.pipeline)
+            .field("partition", &self.waker.partition)
+            .field("output", &output)
+            .finish()
+    }
+}
+
+impl Task {
+    /// Call [`Pipeline::poll_partition`] attempting to make progress on query execution
+    pub fn do_work(self) {
+        assert!(is_worker(), "Task::do_work called outside of worker pool");
+        if self.query.is_cancelled() {
+            return;
+        }
+
+        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+        // this allows us to detect concurrent wake ups and handle them correctly
+        //
+        // We aren't using the wake count to synchronise other memory, and so can
+        // use relaxed memory ordering
+        let wake_count = self.waker.wake_count.load(Ordering::Relaxed);
+
+        let node = self.waker.pipeline;
+        let partition = self.waker.partition;
+
+        let waker = futures::task::waker_ref(&self.waker);
+        let mut cx = Context::from_waker(&*waker);
+
+        let pipelines = self.query.pipelines();
+        let routable = &pipelines[node];
+        match routable.pipeline.poll_partition(&mut cx, partition) {
+            Poll::Ready(Some(Ok(batch))) => {
+                trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
+                match routable.output {
+                    Some(link) => {
+                        trace!(
+                            "Publishing batch to pipeline {:?} partition {}",
+                            link,
+                            partition
+                        );
+                        pipelines[link.pipeline]
+                            .pipeline
+                            .push(batch, link.child, partition)

Review Comment:
   Is it correct that this call may do a non trivial amount of work (aka this is the push / keep working on the batch in the same thread that produced it)?



##########
datafusion/scheduler/src/pipeline/mod.rs:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::ArrowResult;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+pub trait Pipeline: Send + Sync + std::fmt::Debug {
+    /// Push a [`RecordBatch`] to the given input partition

Review Comment:
   is the expectation this returns immediately (as in the pipeline enqueues work to do that must then be polled until it is done)?
   
   Or does it immediately do the work in the current thread (what I would expect a "push" scheduler to do)



##########
datafusion/scheduler/src/task.rs:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{is_worker, spawn_local, spawn_local_fifo, Query};
+use futures::task::ArcWake;
+use log::{debug, trace};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a query using the provided [`Spawner`]
+pub fn spawn_query(query: Arc<Query>) {
+    debug!("Spawning query: {:#?}", query);
+
+    let spawner = query.spawner();
+
+    for (pipeline_idx, query_pipeline) in query.pipelines().iter().enumerate() {
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            spawner.spawn(Task {

Review Comment:
   To check my understanding:
   
   1. This code creates as many `Task`s  as there are "units of of work" -- aka (`pipeline`, `partition`) pairs
   2. There is a big free-for-all race where each Task is asked "do you have any output yet"
   3. If the answer to "yes" the scheduler calls "Pipeline::push` immediately with the output batch
   



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.

Review Comment:
   Can we add some commentary here about how the pipelines are chosen?



##########
datafusion/core/Cargo.toml:
##########
@@ -117,10 +117,6 @@ name = "scalar"
 harness = false
 name = "physical_plan"
 
-[[bench]]
-harness = false
-name = "parquet_query_sql"
-
 [[bench]]

Review Comment:
   Do you think the  scheduler should be an optional `feature` of the `core` (eventually?)



##########
datafusion/scheduler/src/task.rs:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{is_worker, spawn_local, spawn_local_fifo, Query};
+use futures::task::ArcWake;

Review Comment:
   Today I learned about `ArcWake` -- the number of specialized things in the Rust futures ecosystem boggles my mind



##########
datafusion/scheduler/src/query.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use log::debug;
+
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+use crate::{ArrowResult, Spawner};
+
+/// Identifies the [`Pipeline`] within the [`Query`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`Query`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`Query`] is the scheduler's representation of the [`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct Query {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// The output stream for this query's execution
+    output: mpsc::UnboundedSender<ArrowResult<RecordBatch>>,
+}
+
+impl Drop for Query {
+    fn drop(&mut self) {
+        debug!("Query finished");
+    }
+}
+
+impl Query {
+    /// Creates a new [`Query`] from the provided [`ExecutionPlan`], returning
+    /// an [`mpsc::UnboundedReceiver`] that can be used to receive the results
+    /// of this query's execution
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        task_context: Arc<TaskContext>,
+        spawner: Spawner,
+    ) -> Result<(Query, mpsc::UnboundedReceiver<ArrowResult<RecordBatch>>)> {
+        QueryBuilder::new(plan, task_context).build(spawner)
+    }
+
+    /// Returns a list of this queries [`QueryPipeline`]

Review Comment:
   ```suggestion
       /// Returns a list of this queries [`RoutablePipeline`]
   ```



##########
datafusion/scheduler/src/task.rs:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{is_worker, spawn_local, spawn_local_fifo, Query};
+use futures::task::ArcWake;
+use log::{debug, trace};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a query using the provided [`Spawner`]
+pub fn spawn_query(query: Arc<Query>) {
+    debug!("Spawning query: {:#?}", query);
+
+    let spawner = query.spawner();
+
+    for (pipeline_idx, query_pipeline) in query.pipelines().iter().enumerate() {
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            spawner.spawn(Task {
+                query: query.clone(),
+                waker: Arc::new(TaskWaker {
+                    query: Arc::downgrade(&query),
+                    wake_count: AtomicUsize::new(1),
+                    pipeline: pipeline_idx,
+                    partition,
+                }),
+            });
+        }
+    }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+///
+/// A [`Query`] is considered completed when it has no outstanding [`Task`]
+pub struct Task {
+    /// Maintain a link to the [`Query`] this is necessary to be able to
+    /// route the output of the partition to its destination, and also because
+    /// when [`Query`] is dropped it signals completion of query execution
+    query: Arc<Query>,
+
+    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution
+    waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let output = self.query.pipelines()[self.waker.pipeline].output;
+
+        f.debug_struct("Task")
+            .field("pipeline", &self.waker.pipeline)
+            .field("partition", &self.waker.partition)
+            .field("output", &output)
+            .finish()
+    }
+}
+
+impl Task {
+    /// Call [`Pipeline::poll_partition`] attempting to make progress on query execution
+    pub fn do_work(self) {
+        assert!(is_worker(), "Task::do_work called outside of worker pool");
+        if self.query.is_cancelled() {
+            return;
+        }
+
+        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+        // this allows us to detect concurrent wake ups and handle them correctly
+        //
+        // We aren't using the wake count to synchronise other memory, and so can
+        // use relaxed memory ordering

Review Comment:
   I suggest starting with at least sequentially consistent  for coordination to be more resilient to (hard to debug) bugs unless there is some compelling performance measurement meaning that Relaxed ordering is better



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::debug;
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism]
+/// which decouples execution parallelism from the parallelism expressed in the physical plan
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a partition within
+/// a particular pipeline that may be able to make progress on some "morsel" of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` threads in its thread pool
+    pub fn new(num_threads: usize) -> Self {
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .thread_name(|idx| format!("df-worker-{}", idx))
+            .build()
+            .unwrap();
+
+        Self {
+            pool: Arc::new(pool),
+        }
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {
+    pool: Arc<ThreadPool>,
+}
+
+impl Spawner {
+    pub fn spawn(&self, task: Task) {
+        debug!("Spawning {:?} to any worker", task);
+        self.pool.spawn(move || task.do_work());
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::util::pretty::pretty_format_batches;
+    use std::ops::Range;
+
+    use futures::TryStreamExt;
+    use log::info;
+    use rand::distributions::uniform::SampleUniform;
+    use rand::{thread_rng, Rng};
+
+    use datafusion::arrow::array::{ArrayRef, PrimitiveArray};
+    use datafusion::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
+    use datafusion::arrow::record_batch::RecordBatch;
+    use datafusion::datasource::{MemTable, TableProvider};
+    use datafusion::physical_plan::displayable;
+    use datafusion::prelude::{SessionConfig, SessionContext};
+
+    use super::*;
+
+    fn generate_primitive<T, R>(
+        rng: &mut R,
+        len: usize,
+        valid_percent: f64,
+        range: Range<T::Native>,
+    ) -> ArrayRef
+    where
+        T: ArrowPrimitiveType,
+        T::Native: SampleUniform,
+        R: Rng,
+    {
+        Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+            rng.gen_bool(valid_percent)
+                .then(|| rng.gen_range(range.clone()))
+        })))
+    }
+
+    fn generate_batch<R: Rng>(
+        rng: &mut R,
+        row_count: usize,
+        id_offset: i32,
+    ) -> RecordBatch {
+        let id_range = id_offset..(row_count as i32 + id_offset);
+        let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 0..1000);
+        let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. ..1000.);
+        let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
+
+        RecordBatch::try_from_iter_with_nullable([
+            ("a", a, true),
+            ("b", b, true),
+            ("id", Arc::new(id), false),
+        ])
+        .unwrap()
+    }
+
+    fn make_batches() -> Vec<Vec<RecordBatch>> {
+        let mut rng = thread_rng();
+
+        let batches_per_partition = 20;
+        let rows_per_batch = 100;
+        let num_partitions = 2;
+
+        let mut id_offset = 0;
+
+        (0..num_partitions)
+            .map(|_| {
+                (0..batches_per_partition)
+                    .map(|_| {
+                        let batch = generate_batch(&mut rng, rows_per_batch, id_offset);
+                        id_offset += rows_per_batch as i32;
+                        batch
+                    })
+                    .collect()
+            })
+            .collect()
+    }
+
+    fn make_provider() -> Arc<dyn TableProvider> {
+        let batches = make_batches();
+        let schema = batches.first().unwrap().first().unwrap().schema();
+        Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
+    }
+
+    fn init_logging() {
+        let _ = env_logger::builder().is_test(true).try_init();
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        init_logging();
+
+        let scheduler = Scheduler::new(4);
+
+        let config = SessionConfig::new().with_target_partitions(4);
+        let context = SessionContext::with_config(config);
+
+        context.register_table("table1", make_provider()).unwrap();
+        context.register_table("table2", make_provider()).unwrap();
+
+        let queries = [
+            "select * from table1 order by id",
+            "select * from table1 where table1.a > 100 order by id",
+            "select distinct a from table1 where table1.b > 100 order by a",
+            "select * from table1 join table2 on table1.id = table2.id order by table1.id",
+            "select id from table1 union all select id from table2 order by id",
+            "select id from table1 union all select id from table2 where a > 100 order by id",
+            "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
+            "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id",
+            "select count(*) from table1 where table1.a > 4",
+        ];
+
+        for sql in queries {
+            let task = context.task_ctx();
+
+            let query = context.sql(sql).await.unwrap();
+
+            let plan = query.create_physical_plan().await.unwrap();
+
+            info!("Plan: {}", displayable(plan.as_ref()).indent());
+
+            let stream = scheduler.schedule(plan, task).unwrap();
+            let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+            let expected = query.collect().await.unwrap();
+
+            let total_expected = expected.iter().map(|x| x.num_rows()).sum::<usize>();
+            let total_scheduled = scheduled.iter().map(|x| x.num_rows()).sum::<usize>();
+            assert_eq!(total_expected, total_scheduled);
+
+            info!("Query \"{}\" produced {} rows", sql, total_expected);
+
+            let expected = pretty_format_batches(&expected).unwrap().to_string();
+            let scheduled = pretty_format_batches(&scheduled).unwrap().to_string();
+
+            assert_eq!(

Review Comment:
   `assert_batches_eq!`?



##########
datafusion/scheduler/src/task.rs:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{is_worker, spawn_local, spawn_local_fifo, Query};
+use futures::task::ArcWake;
+use log::{debug, trace};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a query using the provided [`Spawner`]
+pub fn spawn_query(query: Arc<Query>) {
+    debug!("Spawning query: {:#?}", query);
+
+    let spawner = query.spawner();
+
+    for (pipeline_idx, query_pipeline) in query.pipelines().iter().enumerate() {
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            spawner.spawn(Task {
+                query: query.clone(),
+                waker: Arc::new(TaskWaker {
+                    query: Arc::downgrade(&query),
+                    wake_count: AtomicUsize::new(1),
+                    pipeline: pipeline_idx,
+                    partition,
+                }),
+            });
+        }
+    }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+///
+/// A [`Query`] is considered completed when it has no outstanding [`Task`]
+pub struct Task {
+    /// Maintain a link to the [`Query`] this is necessary to be able to
+    /// route the output of the partition to its destination, and also because
+    /// when [`Query`] is dropped it signals completion of query execution
+    query: Arc<Query>,
+
+    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution
+    waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let output = self.query.pipelines()[self.waker.pipeline].output;
+
+        f.debug_struct("Task")
+            .field("pipeline", &self.waker.pipeline)
+            .field("partition", &self.waker.partition)
+            .field("output", &output)
+            .finish()
+    }
+}
+
+impl Task {
+    /// Call [`Pipeline::poll_partition`] attempting to make progress on query execution
+    pub fn do_work(self) {
+        assert!(is_worker(), "Task::do_work called outside of worker pool");
+        if self.query.is_cancelled() {
+            return;
+        }
+
+        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+        // this allows us to detect concurrent wake ups and handle them correctly
+        //
+        // We aren't using the wake count to synchronise other memory, and so can
+        // use relaxed memory ordering
+        let wake_count = self.waker.wake_count.load(Ordering::Relaxed);
+
+        let node = self.waker.pipeline;
+        let partition = self.waker.partition;
+
+        let waker = futures::task::waker_ref(&self.waker);
+        let mut cx = Context::from_waker(&*waker);
+
+        let pipelines = self.query.pipelines();
+        let routable = &pipelines[node];
+        match routable.pipeline.poll_partition(&mut cx, partition) {

Review Comment:
   If the existing operators were not `async` / `Future`s, would this code still look like a poll? Or would it do something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org