You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/26 17:53:42 UTC

[GitHub] [arrow] andygrove opened a new pull request #8283: ARROW-9707: DataFusion Scheduler [WIP]

andygrove opened a new pull request #8283:
URL: https://github.com/apache/arrow/pull/8283


   This PR introduces a scheduler for query execution that breaks a physical plan down into a DAG of query stages based on changes in partitioning. Each stage represents a portion of the query where partitions (tasks) can be executed in parallel on a thread pool.
   
   The intent is for the scheduler to decide how to allocate tasks to threads/cores and move all the threading logic out of the executors themselves. 
   
   The code is based on a working prototype that I had previously implemented in Ballista (also ASL 2.0) and myself and @jorgecarleitao have been the only contributors to this code so far and we both have signed CLAs on file.
   
   The current code compiles but is not complete and doesn't actually work yet. I will try and get this fully working for the 2.0.0 release if others think this is a good approach.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-726756633


   @andygrove  I wonder what, if anything, you plan to do with this PR now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r495493604



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {
+            let stage = stage.borrow_mut();
+            let status = stage_status_map.get(&stage.id).unwrap();
+            match status {
+                StageStatus::Pending => {
+                    // have prior stages already completed ?
+                    if stage.prior_stages.iter().all(|id| {
+                        match stage_status_map.get(id) {
+                            Some(StageStatus::Completed) => true,
+                            _ => false,
+                        }
+                    }) {
+                        println!("Running stage {}", stage.id);
+                        let plan = stage
+                            .plan
+                            .as_ref()
+                            .expect("all stages should have plans at execution time");
+
+                        let stage_start = Instant::now();
+
+                        let exec = plan;
+                        let parts = exec.output_partitioning().partition_count();
+
+                        // build queue of tasks per executor
+                        let mut next_executor_id = 0;
+                        let mut executor_tasks: HashMap<String, Vec<ExecutionTask>> =
+                            HashMap::new();
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..executors.len() {
+                            //executor_tasks.insert(executors[i].id.clone(), vec![]);
+                        }
+                        for partition in 0..parts {
+                            let task = ExecutionTask::new(
+                                job.id,
+                                stage.id,
+                                partition,

Review comment:
       Yes, The tasks within each stage represent partitions that can be run in parallel.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vertexclique commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
vertexclique commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r497387507



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {

Review comment:
       Can `into_par_iter()` from rayon suitable in here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-700199462


   That makes sense, and we already have some funky channel and thread
   interaction in the DataFusion parquet reader that we could probably adapt
   fairly easily. We could introduce a config setting for max concurrent
   parquet readers.
   
   On Mon, Sep 28, 2020 at 12:12 PM Andrew Lamb <no...@github.com>
   wrote:
   
   > When I run the TPC-H query I am testing against a data set that has 240
   > Parquet files. If we just try and run everything at once with async/await
   > and have tokio do the scheduling, we will end up with 240 files open at
   > once with reads happening against all of them, which is inefficient.
   >
   > One way to avoid this type of resource usage explosion is if the Parquet
   > reader itself limits the number of outstanding Tasks that it submits. For
   > example, with a tokio channel or something.
   >
   > It seems to me the challenge is not really "scheduling" per se, but more
   > "resource allocation"
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/8283#issuecomment-700197576>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRGAPSBS2HWZRE2PI73SIDGX5ANCNFSM4R3A4JHA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r495617565



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {
+            let stage = stage.borrow_mut();
+            let status = stage_status_map.get(&stage.id).unwrap();
+            match status {
+                StageStatus::Pending => {
+                    // have prior stages already completed ?
+                    if stage.prior_stages.iter().all(|id| {
+                        match stage_status_map.get(id) {
+                            Some(StageStatus::Completed) => true,
+                            _ => false,
+                        }
+                    }) {
+                        println!("Running stage {}", stage.id);
+                        let plan = stage
+                            .plan
+                            .as_ref()
+                            .expect("all stages should have plans at execution time");
+
+                        let stage_start = Instant::now();
+
+                        let exec = plan;
+                        let parts = exec.output_partitioning().partition_count();
+
+                        // build queue of tasks per executor
+                        let mut next_executor_id = 0;
+                        let mut executor_tasks: HashMap<String, Vec<ExecutionTask>> =
+                            HashMap::new();
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..executors.len() {
+                            //executor_tasks.insert(executors[i].id.clone(), vec![]);
+                        }
+                        for partition in 0..parts {
+                            let task = ExecutionTask::new(
+                                job.id,
+                                stage.id,
+                                partition,
+                                plan.clone(),
+                                shuffle_location_map.clone(),
+                            );
+
+                            // load balance across the executors
+                            let executor_meta = &executors[next_executor_id];
+                            next_executor_id += 1;
+                            if next_executor_id == executors.len() {
+                                next_executor_id = 0;
+                            }
+
+                            let queue = executor_tasks
+                                .get_mut(&executor_meta.id)
+                                .expect("executor queue should exist");
+
+                            queue.push(task);

Review comment:
       I think I understand what you are saying.
   
   For what it is worth, I think this code will intersect with the async executor (as with the async mode we'll already have a "scheduler" in something that maps works to threads, assuming we break up the work into tasks.
   
   We may want a scheduler that prevents too much of the plan starting (aka not starting all stages at once) but I am bullish about the async approach!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699591199


   This is super exciting!!!! Thanks a lot @andygrove for pushing this through!
   
   I am trying to understand how this is related to other executing architectures in Rust (e.g. Tokyo) and more broadly to the traits in the crate [`futures`](https://docs.rs/crate/futures/0.3.5).
   
   My current hypothesis is that this may be simpler if we leverage some of the existing traits in `futures`.
   
   In my current understanding of DataFusion's Execlution plan, `ExecutionPlan` behaves like an `IntoIter` whose Iterator iterates over parts of a partition, via `execute(part_index)`, where `part_index`'s range is given by `output_partitioning().partition_count()` (Longer version [here](https://docs.google.com/document/d/1yREyFSA1Fx1WMC0swUAZBbNlKPmTyebzAiQ8Eu6Ynpg/edit?usp=sharing)).
   
   In fact, with [some dull changes](https://github.com/jorgecarleitao/arrow/pull/8) (test [here](https://github.com/jorgecarleitao/arrow/pull/8/files#diff-f92f5cc2c20e4cfba21c282e728d53e4R68-R92)), we can iterate over a full partition as follows:
   
   ```rust
   let plan: &dyn ExecutionPlan = ...
   
   plan.into_iter().map(|maybe_part| {
       let part = maybe_part.unwrap();  // todo handle error via special flatten
       part.into_iter().map(|maybe_batch| {
           println!("{:?}", maybe_batch?.num_rows());
           Ok(())
       })
   })
   .flatten()
   .collect::<Result<()>>()?;
   ```
   
   The problem with this is that it requires each node to be responsible for spawning threads, which, as identified in ARROW-9707, is problematic.
   
   To address this, we need to make `execute` something that a scheduler can chew in parts. For that reason, I think that #8285 , that proposes `execute` to be `async`, is beautiful!
   
   **But**, If `execute` is `async`, then I think that `ExecutionPlan` could implement [`IntoStream`](https://docs.rs/futures/0.3.5/futures/future/struct.IntoStream.html) in the same way it can currently implement [`IntoIter`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html). In this scenario, `ExecutionPlan` would become
   
   >  an object that knows know to convert itself into a stream of record batch iterators
   
   The advantage is that any scheduler that consumes [`futures::Stream`](https://docs.rs/futures/0.3.5/futures/stream/trait.Stream.html) can pick this stream and execute it. Since it now knows how to stop in the middle of `execute` in case something is blocking it (including any of its `input`s), it can switch tasks ad-hoc. In other words, IMO we can leverage any async library to run this stream.
   
   In this scenario, one idea to address ARROW-9707 is:
   
   1. land #8285
   2. implement `IntoStream` for `ExecutionPlan`
   3. migrate our calls of `thread:spawn`  in the different nodes to `.await`
   4. pick a scheduler from the shelf and run the stream using it.
   
   If we want some customization (e.g. logging, some special naming for the tasks), we can always add a `trait DataFusionStream: Stream<...>` and implement it on top of `Stream`.
   
   Note that this would effectively make our `ExecutionPlan` to be a dynamically-typed stream adapter (a-la [futures::stream::Select](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.9/futures/stream/struct.Select.html), but dynamic): it consumes one or more streams, and it outputs a new stream. Conceptually, IMO this is exactly what an `ExecutionPlan` plan is.
   
   In a distributed environment, the main difference would be that the physical planner would plan `ExecutionPlan` whose `execute` includes submitting a job to a worker and wait for the result (e.g. via a TCP channel); the result is still a stream though, and the scheduler can decide to wait for network + node's compute and perform a switch, the same way it can wait for I/O.
   
   I am sorry that I only came up with this idea now. I finally understood the implications of #8285.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r505295915



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {

Review comment:
       I tried it Rayon here: https://github.com/jorgecarleitao/arrow/pull/15/files, and I think that it may be the way to go: leverage Rayon's thread pool for CPU-bounded stuff, and tokio's thread pool for non-blocking stuff (e.g. network).
   Tokio itself recommends using Rayon for blocking stuff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699591199


   This is super exciting!!!! Thanks a lot @andygrove for pushing this through!
   
   I am trying to understand how this is related to other executing architectures in Rust (e.g. Tokyo) and more broadly to the traits in the crate [`futures`](https://docs.rs/crate/futures/0.3.5).
   
   My current hypothesis is that this may be simpler if we leverage some of the existing traits in `futures`.
   
   In my current understanding of DataFusion's Execlution plan, `ExecutionPlan` behaves like an `IntoIter` whose Iterator iterates over parts of a partition, via `execute(part_index)`, where `part_index`'s range is given by `output_partitioning().partition_count()` (Longer version [here](https://docs.google.com/document/d/1yREyFSA1Fx1WMC0swUAZBbNlKPmTyebzAiQ8Eu6Ynpg/edit?usp=sharing)).
   
   In fact, with [some dull changes](https://github.com/jorgecarleitao/arrow/pull/8) (test [here](https://github.com/jorgecarleitao/arrow/pull/8/files#diff-f92f5cc2c20e4cfba21c282e728d53e4R68-R92)), we can iterate over a full partition as follows:
   
   ```rust
   let plan: &dyn ExecutionPlan = ...
   
   plan.into_iter().map(|maybe_part| {
       let part = maybe_part.unwrap();  // todo handle error via special flatten
       part.into_iter().map(|maybe_batch| {
           println!("{:?}", maybe_batch?.num_rows());
           Ok(())
       })
   })
   .flatten()
   .collect::<Result<()>>()?;
   ```
   
   The problem with this is that it requires each node to be responsible for spawning threads, which, as identified in ARROW-9707, is problematic.
   
   To address this, we need to make `execute` something that a scheduler can chew in parts. For that reason, I think that #8285 , that proposes `execute` to be `async`, is beautiful!
   
   **But**, If `execute` is `async`, then I think that `ExecutionPlan` could implement [`IntoStream`](https://docs.rs/futures/0.3.5/futures/future/struct.IntoStream.html). In this scenario, `ExecutionPlan` would become
   
   >  an object that knows know to convert itself into a stream of record batch iterators
   
   The advantage is that any scheduler that consumes [`futures::Stream`](https://docs.rs/futures/0.3.5/futures/stream/trait.Stream.html) can pick this stream and execute it. Since it now knows how to stop in the middle of `execute` in case something is blocking it (including any of its `input`s), it can switch tasks ad-hoc. In other words, IMO we can leverage any async library to run this stream.
   
   In this scenario, one idea to address ARROW-9707 is:
   
   1. land #8285
   2. implement `IntoStream` for `ExecutionPlan`
   3. migrate our calls of `thread:spawn`  in the different nodes to `.await`
   4. pick a scheduler from the shelf and run the stream using it.
   
   If we want some customization (e.g. logging, some special naming for the tasks), we can always add a `trait DataFusionStream: Stream<...>` and implement it on top of `Stream`.
   
   Note that this would effectively make our `ExecutionPlan` to be a dynamically-typed stream adapter (a-la [futures::stream::Select](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.9/futures/stream/struct.Select.html), but dynamic): it consumes one or more streams, and it outputs a new stream. Conceptually, IMO this is exactly what an `ExecutionPlan` plan is.
   
   In a distributed environment, the main difference would be that the physical planner would plan `ExecutionPlan` whose `execute` includes submitting a job to a worker and wait for the result (e.g. via a TCP channel); the result is still a stream though, and the scheduler can decide to wait for network + node's compute and perform a switch, the same way it can wait for I/O.
   
   I am sorry that I only came up with this idea now. I finally understood the implications of #8285.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r505414968



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {

Review comment:
       I left some comments on https://github.com/jorgecarleitao/arrow/pull/15/files#r505414319




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699658056


   It makes sense, @andygrove.
   
   Note that I do not disagree with us having a custom scheduler. I was noting that we could separate the two problems: one problem is creation of tasks, and the other is the scheduling of said tasks. When we spawn tasks in `executionPlan::execute`, we may be mixing these two.
   
   The same way that we expose execution details of a node via (`output_partitioning`), we can expose other execution details (e.g. `is_scan()`), that our custom scheduler can use to e.g. limit concurrency.
   
   Regardless, this is in no way a blocker or anything, and we can leave this discussion for a future iteration - I agree that what you are doing here is really great stuff and we should go for it 🚀 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8283: ARROW-9707: DataFusion Scheduler [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699527610


   FYI @jorgecarleitao @alamb @alippai @vertexclique @svenwb since you have all commented on the related JIRA and design document.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alippai edited a comment on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alippai edited a comment on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699695827


   @andygrove I think now you understand all my issues I had previously. The scheduler proposal and the recent comments regarding the concurrency are all superb, I think you are on track. Thanks for listening for my newbie concerns.
   
   My only note: https://github.com/apache/arrow/pull/8283#issuecomment-699655553 likely you want to read a largish partition in "one go". AFAIR HDFS creates ~128MB large parquet chunks. Reading ~100MB large parquet files, or large columns with tens of MBs of data in one go will likely increase the throughput. While using local disks values over a few MBs won't make any difference, but using S3, HDFS, GPFS, NFS it can be beneficial. 
   
   I couldn't find how the TPC-H parquet files you test with are structured, can you give me some pointers?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r495494081



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {
+            let stage = stage.borrow_mut();
+            let status = stage_status_map.get(&stage.id).unwrap();
+            match status {
+                StageStatus::Pending => {
+                    // have prior stages already completed ?
+                    if stage.prior_stages.iter().all(|id| {
+                        match stage_status_map.get(id) {
+                            Some(StageStatus::Completed) => true,
+                            _ => false,
+                        }
+                    }) {
+                        println!("Running stage {}", stage.id);
+                        let plan = stage
+                            .plan
+                            .as_ref()
+                            .expect("all stages should have plans at execution time");
+
+                        let stage_start = Instant::now();
+
+                        let exec = plan;
+                        let parts = exec.output_partitioning().partition_count();
+
+                        // build queue of tasks per executor
+                        let mut next_executor_id = 0;
+                        let mut executor_tasks: HashMap<String, Vec<ExecutionTask>> =
+                            HashMap::new();
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..executors.len() {
+                            //executor_tasks.insert(executors[i].id.clone(), vec![]);
+                        }
+                        for partition in 0..parts {
+                            let task = ExecutionTask::new(
+                                job.id,
+                                stage.id,
+                                partition,
+                                plan.clone(),
+                                shuffle_location_map.clone(),
+                            );
+
+                            // load balance across the executors
+                            let executor_meta = &executors[next_executor_id];
+                            next_executor_id += 1;
+                            if next_executor_id == executors.len() {
+                                next_executor_id = 0;
+                            }
+
+                            let queue = executor_tasks
+                                .get_mut(&executor_meta.id)
+                                .expect("executor queue should exist");
+
+                            queue.push(task);

Review comment:
       Within each query stage, it is still a pull model. The leaf query stages will typically be a scan wrapped by some other operators. With the currently supported DataFusion operators, the `MergeExec` is essentially the shuffle exchange that would represent query stage boundaries. For example, `LocalLimit` and `HashAggregate(mode=Final)` operators would not start executing until their input query stages complete. This is not dissimilar to how `MergeExec` works today in that it blocks on the input threads, but it is occupying a thread itself while it waits in the current model but with the scheduler model, the `MergeExec` (possibly replaced with `ShuffleReader`) would not start running until the input stages have completed. I'm not sure if that totally answers your question but hopefully sheds a little more light on this design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8283:
URL: https://github.com/apache/arrow/pull/8283


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r497406955



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {

Review comment:
       I think Rayon has its own thread pool, so if we used into_par_iter() / Rayon that might interfere / confuse things with using an async executor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alippai commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alippai commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699695827


   @andygrove I think now you understand all my issues I had previously. The scheduler proposal and the recent comments regarding the concurrency are all superb, I think you are on track. Thanks for listening about my fears previously.
   
   My only note: https://github.com/apache/arrow/pull/8283#issuecomment-699655553 likely you want to read a largish partition in "one go". AFAIR HDFS creates ~128MB large parquet chunks. Reading ~100MB large parquet files, or large columns with tens of MBs of data in one go will likely increase the throughput. While using local disks values over a few MBs won't make any difference, but using S3, HDFS, GPFS, NFS it can be beneficial. 
   
   I couldn't find how the TPC-H parquet files you test with are structured, can you give me some pointers?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-727190329


   @andygrove  your description makes sense. Thank you for the ideas that this PR sparked. 
   
   I agree that an csv --> parquet converter would be really useful and cool. We will likely need much of the same underlying parts (arrow->parquet, with sorting and partitioning) for IOx, so I am hopeful we can contribute in that area as well


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-700198208


   BTW @jorgecarleitao  -- I really like your ideas regarding using async streams in `ExecutionPlan` -- I think it sounds like a very elegant way to implement back pressure (and avoid starting too many things at once)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler [WIP]

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699528805


   https://issues.apache.org/jira/browse/ARROW-9707


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-726811184


   @alamb That is a good question. I have been too busy at work lately to work on Arrow/DataFusion/Ballista but I have been spending some time contemplating where to go next.
   
   This scheduler prototype is interesting, but until we have partitioning, shuffles, joins, and async working smoothly, perhaps there isn't much point working on the scheduler yet. I would be ok with closing this PR for now. It also might be premature for me to try and contribute a scheduler to DataFusion since I am really just prototyping this right now and lack experience in this area.
   
   If we had partitioning, shuffles, and joins in DataFusion, it would mean we could run a much wider range of TPC-H queries on a single node, and we could also build some nice command line utilities for converting data sets from CSV and Parquet with repartioning which would be quite compelling for a lot of people IMO and could attract new contributors.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#discussion_r495491244



##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {
+            let stage = stage.borrow_mut();
+            let status = stage_status_map.get(&stage.id).unwrap();
+            match status {
+                StageStatus::Pending => {
+                    // have prior stages already completed ?
+                    if stage.prior_stages.iter().all(|id| {
+                        match stage_status_map.get(id) {
+                            Some(StageStatus::Completed) => true,
+                            _ => false,
+                        }
+                    }) {
+                        println!("Running stage {}", stage.id);
+                        let plan = stage
+                            .plan
+                            .as_ref()
+                            .expect("all stages should have plans at execution time");
+
+                        let stage_start = Instant::now();
+
+                        let exec = plan;
+                        let parts = exec.output_partitioning().partition_count();
+
+                        // build queue of tasks per executor
+                        let mut next_executor_id = 0;
+                        let mut executor_tasks: HashMap<String, Vec<ExecutionTask>> =
+                            HashMap::new();
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..executors.len() {
+                            //executor_tasks.insert(executors[i].id.clone(), vec![]);
+                        }
+                        for partition in 0..parts {
+                            let task = ExecutionTask::new(
+                                job.id,
+                                stage.id,
+                                partition,

Review comment:
       Does this call imply that an execution plan node with multiple partitions could be run by multiple threads concurrently?
   

##########
File path: rust/datafusion/src/scheduler/mod.rs
##########
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+use std::time::Instant;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::ExecutionError;
+use crate::error::Result;
+use crate::physical_plan::shuffle::{ShuffleExchangeExec, ShuffleReaderExec};
+use crate::physical_plan::ExecutionPlan;
+
+use crate::execution::context::ExecutionContext;
+use uuid::Uuid;
+
+/// A Job typically represents a single query and the query is executed in stages. Stages are
+/// separated by map operations (shuffles) to re-partition data before the next stage starts.
+#[derive(Debug)]
+pub struct Job {
+    /// Job UUID
+    pub id: Uuid,
+    /// A list of stages within this job. There can be dependencies between stages to form
+    /// a directed acyclic graph (DAG).
+    pub stages: Vec<Rc<RefCell<Stage>>>,
+    /// The root stage id that produces the final results
+    pub root_stage_id: usize,
+}
+
+impl Job {
+    pub fn explain(&self) {
+        println!("Job {} has {} stages:\n", self.id, self.stages.len());
+        self.stages.iter().for_each(|stage| {
+            let stage = stage.as_ref().borrow();
+            println!("Stage {}:\n", stage.id);
+            if stage.prior_stages.is_empty() {
+                println!("Stage {} has no dependencies.", stage.id);
+            } else {
+                println!(
+                    "Stage {} depends on stages {:?}.",
+                    stage.id, stage.prior_stages
+                );
+            }
+            println!(
+                "\n{:?}\n",
+                stage
+                    .plan
+                    .as_ref()
+                    .expect("Stages should always have a plan")
+            );
+        })
+    }
+}
+
+/// A query stage represents a portion of a physical plan with the same partitioning
+/// scheme throughout, meaning that each partition can be executed in parallel. Query
+/// stages form a DAG.
+#[derive(Debug)]
+pub struct Stage {
+    /// Stage id which is unique within a job.
+    pub id: usize,
+    /// A list of stages that must complete before this stage can execute.
+    pub prior_stages: Vec<usize>,
+    /// The physical plan to execute for this stage
+    pub plan: Option<Arc<dyn ExecutionPlan>>,
+}
+
+impl Stage {
+    /// Create a new empty stage with the specified id.
+    fn new(id: usize) -> Self {
+        Self {
+            id,
+            prior_stages: vec![],
+            plan: None,
+        }
+    }
+}
+
+/// Task that can be sent to an executor for execution. Tasks represent single partitions
+/// within stagees.
+#[derive(Debug, Clone)]
+pub struct ExecutionTask {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+}
+
+impl ExecutionTask {
+    pub fn new(
+        job_uuid: Uuid,
+        stage_id: usize,
+        partition_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        shuffle_locations: HashMap<ShuffleId, ExecutorMeta>,
+    ) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+            plan,
+            shuffle_locations,
+        }
+    }
+
+    pub fn key(&self) -> String {
+        format!("{}.{}.{}", self.job_uuid, self.stage_id, self.partition_id)
+    }
+}
+
+/// Unique identifier for the output shuffle partition of an operator.
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct ShuffleId {
+    pub(crate) job_uuid: Uuid,
+    pub(crate) stage_id: usize,
+    pub(crate) partition_id: usize,
+}
+
+impl ShuffleId {
+    pub fn new(job_uuid: Uuid, stage_id: usize, partition_id: usize) -> Self {
+        Self {
+            job_uuid,
+            stage_id,
+            partition_id,
+        }
+    }
+}
+
+/// Create a Job (DAG of stages) from a physical execution plan.
+pub fn create_job(plan: Arc<dyn ExecutionPlan>) -> Result<Job> {
+    let mut scheduler = JobScheduler::new();
+    scheduler.create_job(plan)?;
+    Ok(scheduler.job)
+}
+
+pub struct JobScheduler {
+    job: Job,
+    next_stage_id: usize,
+}
+
+impl JobScheduler {
+    fn new() -> Self {
+        let job = Job {
+            id: Uuid::new_v4(),
+            stages: vec![],
+            root_stage_id: 0,
+        };
+        Self {
+            job,
+            next_stage_id: 0,
+        }
+    }
+
+    fn create_job(&mut self, plan: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let new_stage_id = self.next_stage_id;
+        self.next_stage_id += 1;
+        let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+        self.job.stages.push(new_stage.clone());
+        let plan = self.visit_plan(plan, new_stage.clone())?;
+        new_stage.as_ref().borrow_mut().plan = Some(plan);
+        Ok(())
+    }
+
+    fn visit_plan(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        current_stage: Rc<RefCell<Stage>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(exchange) =
+            plan.as_ref().as_any().downcast_ref::<ShuffleExchangeExec>()
+        {
+            // shuffle indicates that we need a new stage
+            let new_stage_id = self.next_stage_id;
+            self.next_stage_id += 1;
+            let new_stage = Rc::new(RefCell::new(Stage::new(new_stage_id)));
+            self.job.stages.push(new_stage.clone());
+
+            // the children need to be part of this new stage
+            let shuffle_input =
+                self.visit_plan(exchange.child.clone(), new_stage.clone())?;
+
+            new_stage.as_ref().borrow_mut().plan = Some(shuffle_input);
+
+            // the current stage depends on this new stage
+            current_stage
+                .as_ref()
+                .borrow_mut()
+                .prior_stages
+                .push(new_stage_id);
+
+            // return a shuffle reader to read the results from the stage
+            let n = exchange.child.output_partitioning().partition_count();
+
+            let shuffle_id = (0..n)
+                .map(|n| ShuffleId {
+                    job_uuid: self.job.id,
+                    stage_id: new_stage_id,
+                    partition_id: n,
+                })
+                .collect();
+            Ok(Arc::new(ShuffleReaderExec::new(
+                exchange.schema(),
+                shuffle_id,
+            )))
+        } else {
+            let new_children = plan
+                .children()
+                .iter()
+                .map(|child| self.visit_plan(child.clone(), current_stage.clone()))
+                .collect::<Result<Vec<_>>>()?;
+            plan.with_new_children(new_children)
+        }
+    }
+}
+
+enum StageStatus {
+    Pending,
+    Completed,
+}
+
+enum TaskStatus {
+    Pending(Instant),
+    Running(Instant),
+    Completed(ShuffleId),
+    Failed(String),
+}
+
+#[derive(Debug, Clone)]
+struct ExecutorShuffleIds {
+    executor_id: String,
+    shuffle_ids: Vec<ShuffleId>,
+}
+
+/// Execute a job directly against executors as starting point
+pub async fn execute_job(job: &Job, ctx: &ExecutionContext) -> Result<Vec<RecordBatch>> {
+    let executors: Vec<ExecutorMeta> = vec![]; //ctx.get_executor_ids().await?;
+
+    println!("Executors: {:?}", executors);
+
+    if executors.is_empty() {
+        println!("no executors found");
+        return Err(ExecutionError::General(
+            "no executors available".to_string(),
+        ));
+    }
+
+    let mut shuffle_location_map: HashMap<ShuffleId, ExecutorMeta> = HashMap::new();
+
+    let mut stage_status_map = HashMap::new();
+
+    for stage in &job.stages {
+        let stage = stage.borrow_mut();
+        stage_status_map.insert(stage.id, StageStatus::Pending);
+    }
+
+    // loop until all stages are complete
+    let mut num_completed = 0;
+    while num_completed < job.stages.len() {
+        num_completed = 0;
+
+        //TODO do stages in parallel when possible
+        for stage in &job.stages {
+            let stage = stage.borrow_mut();
+            let status = stage_status_map.get(&stage.id).unwrap();
+            match status {
+                StageStatus::Pending => {
+                    // have prior stages already completed ?
+                    if stage.prior_stages.iter().all(|id| {
+                        match stage_status_map.get(id) {
+                            Some(StageStatus::Completed) => true,
+                            _ => false,
+                        }
+                    }) {
+                        println!("Running stage {}", stage.id);
+                        let plan = stage
+                            .plan
+                            .as_ref()
+                            .expect("all stages should have plans at execution time");
+
+                        let stage_start = Instant::now();
+
+                        let exec = plan;
+                        let parts = exec.output_partitioning().partition_count();
+
+                        // build queue of tasks per executor
+                        let mut next_executor_id = 0;
+                        let mut executor_tasks: HashMap<String, Vec<ExecutionTask>> =
+                            HashMap::new();
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..executors.len() {
+                            //executor_tasks.insert(executors[i].id.clone(), vec![]);
+                        }
+                        for partition in 0..parts {
+                            let task = ExecutionTask::new(
+                                job.id,
+                                stage.id,
+                                partition,
+                                plan.clone(),
+                                shuffle_location_map.clone(),
+                            );
+
+                            // load balance across the executors
+                            let executor_meta = &executors[next_executor_id];
+                            next_executor_id += 1;
+                            if next_executor_id == executors.len() {
+                                next_executor_id = 0;
+                            }
+
+                            let queue = executor_tasks
+                                .get_mut(&executor_meta.id)
+                                .expect("executor queue should exist");
+
+                            queue.push(task);

Review comment:
       I wonder how this will handle data flowing in a plan where a producer (eg scan) can make data faster than the consumer (eg aggregate) can consume it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-700197576


   > When I run the TPC-H query I am testing against a data set that has 240 Parquet files. If we just try and run everything at once with async/await and have tokio do the scheduling, we will end up with 240 files open at once with reads happening against all of them, which is inefficient.
   
   One way to avoid this type of resource usage explosion is if the Parquet reader itself limits the number of outstanding `Task`s that it submits. For example, with a tokio channel or something.
   
   It seems to me the challenge is not really "scheduling" per se, but more "resource allocation"


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8283: ARROW-9707: [Rust] [DataFusion] DataFusion Scheduler Prototype [WIP]

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699655553


   @jorgecarleitao Async/await helps a lot but we also need our own scheduler to orchestrate how a query is executed. I am going to write up something more detailed with my reasoning on this soon but here is one example. When I run the TPC-H query I am testing against a data set that has 240 Parquet files. If we just try and run everything at once with async/await and have tokio do the scheduling, we will end up with 240 files open at once with reads happening against all of them, which is inefficient. It is better to process a smaller number of files concurrently (better use of page caches, fewer file handles open, etc) and process them in batches. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org