You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2023/07/13 01:26:41 UTC

[arrow-ballista] branch main updated: Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dfdfeaf  Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
5dfdfeaf is described below

commit 5dfdfeaf57f953b62992ac3c1ae592dfb655f631
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 13 09:26:37 2023 +0800

     Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
    
    * Remove ExecutorReservation and change the task assignment philosophy from executor first to task first
    
    * Add PENDING_JOBS_METRIC and RUNNING_JOBS_METRIC to replace INFLIGHT_TASKS_METRIC_NAME
    
    ---------
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/scheduler/src/cluster/kv.rs               | 271 +++------
 ballista/scheduler/src/cluster/memory.rs           | 216 ++------
 ballista/scheduler/src/cluster/mod.rs              | 474 +++++++++++++---
 ballista/scheduler/src/cluster/storage/mod.rs      |  11 -
 ballista/scheduler/src/cluster/test/mod.rs         | 587 --------------------
 ballista/scheduler/src/cluster/test_util/mod.rs    | 214 ++++++++
 ballista/scheduler/src/scheduler_server/event.rs   |  18 +-
 .../src/scheduler_server/external_scaler.rs        |  21 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    | 114 ++--
 ballista/scheduler/src/scheduler_server/mod.rs     |  77 +--
 .../src/scheduler_server/query_stage_scheduler.rs  | 436 +++++----------
 ballista/scheduler/src/state/execution_graph.rs    |  79 ++-
 ballista/scheduler/src/state/executor_manager.rs   | 568 +++++--------------
 ballista/scheduler/src/state/mod.rs                | 608 ++++++---------------
 ballista/scheduler/src/state/task_manager.rs       | 129 ++---
 ballista/scheduler/src/test_utils.rs               |  26 +-
 16 files changed, 1412 insertions(+), 2437 deletions(-)

diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs
index effbc0cc..a8852fb6 100644
--- a/ballista/scheduler/src/cluster/kv.rs
+++ b/ballista/scheduler/src/cluster/kv.rs
@@ -17,13 +17,14 @@
 
 use crate::cluster::storage::{KeyValueStore, Keyspace, Lock, Operation, WatchEvent};
 use crate::cluster::{
-    reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream,
-    JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
+    bind_task_bias, bind_task_round_robin, BoundTask, ClusterState,
+    ExecutorHeartbeatStream, ExecutorSlot, JobState, JobStateEvent, JobStateEventStream,
+    JobStatus, TaskDistribution,
 };
 use crate::scheduler_server::{timestamp_secs, SessionBuilder};
 use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
 use crate::state::session_manager::create_datafusion_context;
+use crate::state::task_manager::JobInfoCache;
 use crate::state::{decode_into, decode_protobuf};
 use async_trait::async_trait;
 use ballista_core::config::BallistaConfig;
@@ -174,12 +175,12 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         Ok(())
     }
 
-    async fn reserve_slots(
+    async fn bind_schedulable_tasks(
         &self,
-        num_slots: u32,
         distribution: TaskDistribution,
+        active_jobs: Arc<HashMap<String, JobInfoCache>>,
         executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>> {
+    ) -> Result<Vec<BoundTask>> {
         let lock = self.store.lock(Keyspace::Slots, "global").await?;
 
         with_lock(lock, async {
@@ -192,7 +193,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     ))
                 })?;
 
-            let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
+            let available_slots: Vec<&mut AvailableTaskSlots> = slots
                 .task_slots
                 .iter_mut()
                 .filter_map(|data| {
@@ -205,82 +206,33 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 })
                 .collect();
 
-            available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
-            let reservations = match distribution {
-                TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
+            let bound_tasks = match distribution {
+                TaskDistribution::Bias => {
+                    bind_task_bias(available_slots, active_jobs, |_| false).await
+                }
                 TaskDistribution::RoundRobin => {
-                    reserve_slots_round_robin(available_slots, num_slots)
+                    bind_task_round_robin(available_slots, active_jobs, |_| false).await
                 }
             };
 
-            if !reservations.is_empty() {
+            if !bound_tasks.is_empty() {
                 self.store
                     .put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
                     .await?
             }
 
-            Ok(reservations)
+            Ok(bound_tasks)
         })
         .await
     }
 
-    async fn reserve_slots_exact(
-        &self,
-        num_slots: u32,
-        distribution: TaskDistribution,
-        executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>> {
-        let lock = self.store.lock(Keyspace::Slots, "global").await?;
-
-        with_lock(lock, async {
-            let resources = self.store.get(Keyspace::Slots, "all").await?;
-
-            let mut slots =
-                ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
-                    BallistaError::Internal(format!(
-                        "Unexpected value in executor slots state: {err:?}"
-                    ))
-                })?;
-
-            let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
-                .task_slots
-                .iter_mut()
-                .filter_map(|data| {
-                    (data.slots > 0
-                        && executors
-                            .as_ref()
-                            .map(|executors| executors.contains(&data.executor_id))
-                            .unwrap_or(true))
-                    .then_some(data)
-                })
-                .collect();
-
-            available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
-            let reservations = match distribution {
-                TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
-                TaskDistribution::RoundRobin => {
-                    reserve_slots_round_robin(available_slots, num_slots)
-                }
-            };
-
-            if reservations.len() == num_slots as usize {
-                self.store
-                    .put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
-                    .await?;
-                Ok(reservations)
-            } else {
-                Ok(vec![])
-            }
-        })
-        .await
-    }
+    async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
+        let mut increments = HashMap::new();
+        for (executor_id, num_slots) in executor_slots {
+            let v = increments.entry(executor_id).or_insert_with(|| 0);
+            *v += num_slots;
+        }
 
-    async fn cancel_reservations(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
         let lock = self.store.lock(Keyspace::Slots, "all").await?;
 
         with_lock(lock, async {
@@ -293,18 +245,9 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     ))
                 })?;
 
-            let mut increments = HashMap::new();
-            for ExecutorReservation { executor_id, .. } in reservations {
-                if let Some(inc) = increments.get_mut(&executor_id) {
-                    *inc += 1;
-                } else {
-                    increments.insert(executor_id, 1usize);
-                }
-            }
-
             for executor_slots in slots.task_slots.iter_mut() {
                 if let Some(slots) = increments.get(&executor_slots.executor_id) {
-                    executor_slots.slots += *slots as u32;
+                    executor_slots.slots += *slots;
                 }
             }
 
@@ -319,8 +262,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         &self,
         metadata: ExecutorMetadata,
         spec: ExecutorData,
-        reserve: bool,
-    ) -> Result<Vec<ExecutorReservation>> {
+    ) -> Result<()> {
         let executor_id = metadata.id.clone();
 
         //TODO this should be in a transaction
@@ -338,83 +280,40 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         })
         .await?;
 
-        if !reserve {
-            let available_slots = AvailableTaskSlots {
-                executor_id,
-                slots: spec.available_task_slots,
-            };
-
-            let lock = self.store.lock(Keyspace::Slots, "all").await?;
+        let available_slots = AvailableTaskSlots {
+            executor_id,
+            slots: spec.available_task_slots,
+        };
 
-            with_lock(lock, async {
-                let current_slots = self.store.get(Keyspace::Slots, "all").await?;
+        let lock = self.store.lock(Keyspace::Slots, "all").await?;
 
-                let mut current_slots: ExecutorTaskSlots =
-                    decode_protobuf(current_slots.as_slice())?;
+        with_lock(lock, async {
+            let current_slots = self.store.get(Keyspace::Slots, "all").await?;
 
-                if let Some((idx, _)) =
-                    current_slots.task_slots.iter().find_position(|slots| {
-                        slots.executor_id == available_slots.executor_id
-                    })
-                {
-                    current_slots.task_slots[idx] = available_slots;
-                } else {
-                    current_slots.task_slots.push(available_slots);
-                }
+            let mut current_slots: ExecutorTaskSlots =
+                decode_protobuf(current_slots.as_slice())?;
 
-                self.store
-                    .put(
-                        Keyspace::Slots,
-                        "all".to_string(),
-                        current_slots.encode_to_vec(),
-                    )
-                    .await
-            })
-            .await?;
-
-            Ok(vec![])
-        } else {
-            let num_slots = spec.available_task_slots as usize;
-            let mut reservations: Vec<ExecutorReservation> = vec![];
-            for _ in 0..num_slots {
-                reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+            if let Some((idx, _)) = current_slots
+                .task_slots
+                .iter()
+                .find_position(|slots| slots.executor_id == available_slots.executor_id)
+            {
+                current_slots.task_slots[idx] = available_slots;
+            } else {
+                current_slots.task_slots.push(available_slots);
             }
 
-            let available_slots = AvailableTaskSlots {
-                executor_id,
-                slots: 0,
-            };
-
-            let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
-            with_lock(lock, async {
-                let current_slots = self.store.get(Keyspace::Slots, "all").await?;
-
-                let mut current_slots: ExecutorTaskSlots =
-                    decode_protobuf(current_slots.as_slice())?;
-
-                if let Some((idx, _)) =
-                    current_slots.task_slots.iter().find_position(|slots| {
-                        slots.executor_id == available_slots.executor_id
-                    })
-                {
-                    current_slots.task_slots[idx] = available_slots;
-                } else {
-                    current_slots.task_slots.push(available_slots);
-                }
-
-                self.store
-                    .put(
-                        Keyspace::Slots,
-                        "all".to_string(),
-                        current_slots.encode_to_vec(),
-                    )
-                    .await
-            })
-            .await?;
+            self.store
+                .put(
+                    Keyspace::Slots,
+                    "all".to_string(),
+                    current_slots.encode_to_vec(),
+                )
+                .await
+        })
+        .await?;
 
-            Ok(reservations)
-        }
+        Ok(())
     }
 
     async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
@@ -502,18 +401,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
 impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> JobState
     for KeyValueState<S, T, U>
 {
-    async fn accept_job(
-        &self,
-        job_id: &str,
-        job_name: &str,
-        queued_at: u64,
-    ) -> Result<()> {
+    fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
         self.queued_jobs
             .insert(job_id.to_string(), (job_name.to_string(), queued_at));
 
         Ok(())
     }
 
+    fn pending_job_number(&self) -> usize {
+        self.queued_jobs.len()
+    }
+
     async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()> {
         if self.queued_jobs.get(&job_id).is_some() {
             let status = graph.status();
@@ -774,13 +672,10 @@ async fn with_lock<Out, F: Future<Output = Out>>(mut lock: Box<dyn Lock>, op: F)
 
 #[cfg(test)]
 mod test {
+
     use crate::cluster::kv::KeyValueState;
     use crate::cluster::storage::sled::SledClient;
-    use crate::cluster::test::{
-        test_executor_registration, test_fuzz_reservations, test_job_lifecycle,
-        test_job_planning_failure, test_reservation,
-    };
-    use crate::cluster::TaskDistribution;
+    use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
     use crate::test_utils::{
         test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
     };
@@ -788,48 +683,6 @@ mod test {
     use ballista_core::serde::BallistaCodec;
     use ballista_core::utils::default_session_builder;
 
-    #[cfg(feature = "sled")]
-    fn make_sled_state() -> Result<KeyValueState<SledClient>> {
-        Ok(KeyValueState::new(
-            "",
-            SledClient::try_new_temporary()?,
-            BallistaCodec::default(),
-            default_session_builder,
-        ))
-    }
-
-    #[cfg(feature = "sled")]
-    #[tokio::test]
-    async fn test_sled_executor_reservation() -> Result<()> {
-        test_executor_registration(make_sled_state()?).await
-    }
-
-    #[cfg(feature = "sled")]
-    #[tokio::test]
-    async fn test_sled_reserve() -> Result<()> {
-        test_reservation(make_sled_state()?, TaskDistribution::Bias).await?;
-        test_reservation(make_sled_state()?, TaskDistribution::RoundRobin).await?;
-
-        Ok(())
-    }
-
-    #[cfg(feature = "sled")]
-    #[tokio::test]
-    async fn test_sled_fuzz_reserve() -> Result<()> {
-        test_fuzz_reservations(make_sled_state()?, 10, TaskDistribution::Bias, 10, 10)
-            .await?;
-        test_fuzz_reservations(
-            make_sled_state()?,
-            10,
-            TaskDistribution::RoundRobin,
-            10,
-            10,
-        )
-        .await?;
-
-        Ok(())
-    }
-
     #[cfg(feature = "sled")]
     #[tokio::test]
     async fn test_sled_job_lifecycle() -> Result<()> {
@@ -854,4 +707,14 @@ mod test {
 
         Ok(())
     }
+
+    #[cfg(feature = "sled")]
+    fn make_sled_state() -> Result<KeyValueState<SledClient>> {
+        Ok(KeyValueState::new(
+            "",
+            SledClient::try_new_temporary()?,
+            BallistaCodec::default(),
+            default_session_builder,
+        ))
+    }
 }
diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs
index 9a8a6fc9..6060e79c 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,17 +16,16 @@
 // under the License.
 
 use crate::cluster::{
-    reserve_slots_bias, reserve_slots_round_robin, ClusterState, JobState, JobStateEvent,
-    JobStateEventStream, JobStatus, TaskDistribution,
+    bind_task_bias, bind_task_round_robin, BoundTask, ClusterState, ExecutorSlot,
+    JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
 };
 use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
 use async_trait::async_trait;
 use ballista_core::config::BallistaConfig;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::{
-    executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus,
-    ExecutorTaskSlots, FailedJob, QueuedJob,
+    executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus, FailedJob,
+    QueuedJob,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use dashmap::DashMap;
@@ -35,20 +34,20 @@ use datafusion::prelude::SessionContext;
 use crate::cluster::event::ClusterEventSender;
 use crate::scheduler_server::{timestamp_millis, timestamp_secs, SessionBuilder};
 use crate::state::session_manager::create_datafusion_context;
+use crate::state::task_manager::JobInfoCache;
 use ballista_core::serde::protobuf::job_status::Status;
-use itertools::Itertools;
 use log::warn;
-use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
 use std::ops::DerefMut;
 
 use std::sync::Arc;
+use tokio::sync::Mutex;
 use tracing::debug;
 
 #[derive(Default)]
 pub struct InMemoryClusterState {
     /// Current available task slots for each executor
-    task_slots: Mutex<ExecutorTaskSlots>,
+    task_slots: Mutex<HashMap<String, AvailableTaskSlots>>,
     /// Current executors
     executors: DashMap<String, ExecutorMetadata>,
     /// Last heartbeat received for each executor
@@ -57,17 +56,16 @@ pub struct InMemoryClusterState {
 
 #[async_trait]
 impl ClusterState for InMemoryClusterState {
-    async fn reserve_slots(
+    async fn bind_schedulable_tasks(
         &self,
-        num_slots: u32,
         distribution: TaskDistribution,
+        active_jobs: Arc<HashMap<String, JobInfoCache>>,
         executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>> {
-        let mut guard = self.task_slots.lock();
+    ) -> Result<Vec<BoundTask>> {
+        let mut guard = self.task_slots.lock().await;
 
-        let mut available_slots: Vec<&mut AvailableTaskSlots> = guard
-            .task_slots
-            .iter_mut()
+        let available_slots: Vec<&mut AvailableTaskSlots> = guard
+            .values_mut()
             .filter_map(|data| {
                 (data.slots > 0
                     && executors
@@ -78,76 +76,30 @@ impl ClusterState for InMemoryClusterState {
             })
             .collect();
 
-        available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
-        let reservations = match distribution {
-            TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
-            TaskDistribution::RoundRobin => {
-                reserve_slots_round_robin(available_slots, num_slots)
+        let schedulable_tasks = match distribution {
+            TaskDistribution::Bias => {
+                bind_task_bias(available_slots, active_jobs, |_| false).await
             }
-        };
-
-        Ok(reservations)
-    }
-
-    async fn reserve_slots_exact(
-        &self,
-        num_slots: u32,
-        distribution: TaskDistribution,
-        executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>> {
-        let mut guard = self.task_slots.lock();
-
-        let rollback = guard.clone();
-
-        let mut available_slots: Vec<&mut AvailableTaskSlots> = guard
-            .task_slots
-            .iter_mut()
-            .filter_map(|data| {
-                (data.slots > 0
-                    && executors
-                        .as_ref()
-                        .map(|executors| executors.contains(&data.executor_id))
-                        .unwrap_or(true))
-                .then_some(data)
-            })
-            .collect();
-
-        available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
-        let reservations = match distribution {
-            TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
             TaskDistribution::RoundRobin => {
-                reserve_slots_round_robin(available_slots, num_slots)
+                bind_task_round_robin(available_slots, active_jobs, |_| false).await
             }
         };
 
-        if reservations.len() as u32 != num_slots {
-            *guard = rollback;
-            Ok(vec![])
-        } else {
-            Ok(reservations)
-        }
+        Ok(schedulable_tasks)
     }
 
-    async fn cancel_reservations(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
+    async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
         let mut increments = HashMap::new();
-        for ExecutorReservation { executor_id, .. } in reservations {
-            if let Some(inc) = increments.get_mut(&executor_id) {
-                *inc += 1;
-            } else {
-                increments.insert(executor_id, 1usize);
-            }
+        for (executor_id, num_slots) in executor_slots {
+            let v = increments.entry(executor_id).or_insert_with(|| 0);
+            *v += num_slots;
         }
 
-        let mut guard = self.task_slots.lock();
+        let mut guard = self.task_slots.lock().await;
 
-        for executor_slots in guard.task_slots.iter_mut() {
-            if let Some(slots) = increments.get(&executor_slots.executor_id) {
-                executor_slots.slots += *slots as u32;
+        for (executor_id, num_slots) in increments {
+            if let Some(mut data) = guard.get_mut(&executor_id) {
+                data.slots += num_slots;
             }
         }
 
@@ -157,9 +109,8 @@ impl ClusterState for InMemoryClusterState {
     async fn register_executor(
         &self,
         metadata: ExecutorMetadata,
-        mut spec: ExecutorData,
-        reserve: bool,
-    ) -> Result<Vec<ExecutorReservation>> {
+        spec: ExecutorData,
+    ) -> Result<()> {
         let executor_id = metadata.id.clone();
 
         self.save_executor_metadata(metadata).await?;
@@ -173,37 +124,17 @@ impl ClusterState for InMemoryClusterState {
         })
         .await?;
 
-        let mut guard = self.task_slots.lock();
-
-        // Check to see if we already have task slots for executor. If so, remove them.
-        if let Some((idx, _)) = guard
-            .task_slots
-            .iter()
-            .find_position(|slots| slots.executor_id == executor_id)
-        {
-            guard.task_slots.swap_remove(idx);
-        }
-
-        if reserve {
-            let slots = std::mem::take(&mut spec.available_task_slots) as usize;
-            let reservations = (0..slots)
-                .map(|_| ExecutorReservation::new_free(executor_id.clone()))
-                .collect();
+        let mut guard = self.task_slots.lock().await;
 
-            guard.task_slots.push(AvailableTaskSlots {
-                executor_id,
-                slots: 0,
-            });
-
-            Ok(reservations)
-        } else {
-            guard.task_slots.push(AvailableTaskSlots {
+        guard.insert(
+            executor_id.clone(),
+            AvailableTaskSlots {
                 executor_id,
                 slots: spec.available_task_slots,
-            });
+            },
+        );
 
-            Ok(vec![])
-        }
+        Ok(())
     }
 
     async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
@@ -235,15 +166,9 @@ impl ClusterState for InMemoryClusterState {
 
     async fn remove_executor(&self, executor_id: &str) -> Result<()> {
         {
-            let mut guard = self.task_slots.lock();
-
-            if let Some((idx, _)) = guard
-                .task_slots
-                .iter()
-                .find_position(|slots| slots.executor_id == executor_id)
-            {
-                guard.task_slots.swap_remove(idx);
-            }
+            let mut guard = self.task_slots.lock().await;
+
+            guard.remove(executor_id);
         }
 
         self.heartbeats.remove(executor_id);
@@ -299,7 +224,8 @@ impl InMemoryJobState {
 impl JobState for InMemoryJobState {
     async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()> {
         if self.queued_jobs.get(&job_id).is_some() {
-            self.running_jobs.insert(job_id.clone(), graph.status());
+            self.running_jobs
+                .insert(job_id.clone(), graph.status().clone());
             self.queued_jobs.remove(&job_id);
 
             self.job_event_sender.send(&JobStateEvent::JobAcquired {
@@ -352,7 +278,7 @@ impl JobState for InMemoryJobState {
     }
 
     async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> Result<()> {
-        let status = graph.status();
+        let status = graph.status().clone();
 
         debug!("saving state for job {job_id} with status {:?}", status);
 
@@ -365,7 +291,7 @@ impl JobState for InMemoryJobState {
                 .insert(job_id.to_string(), (status, Some(graph.clone())));
             self.running_jobs.remove(job_id);
         } else if let Some(old_status) =
-            self.running_jobs.insert(job_id.to_string(), graph.status())
+            self.running_jobs.insert(job_id.to_string(), status)
         {
             self.job_event_sender.send(&JobStateEvent::JobUpdated {
                 job_id: job_id.to_string(),
@@ -433,18 +359,17 @@ impl JobState for InMemoryJobState {
             .collect())
     }
 
-    async fn accept_job(
-        &self,
-        job_id: &str,
-        job_name: &str,
-        queued_at: u64,
-    ) -> Result<()> {
+    fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
         self.queued_jobs
             .insert(job_id.to_string(), (job_name.to_string(), queued_at));
 
         Ok(())
     }
 
+    fn pending_job_number(&self) -> usize {
+        self.queued_jobs.len()
+    }
+
     async fn fail_unscheduled_job(&self, job_id: &str, reason: String) -> Result<()> {
         if let Some((job_id, (job_name, queued_at))) = self.queued_jobs.remove(job_id) {
             self.completed_jobs.insert(
@@ -475,57 +400,14 @@ impl JobState for InMemoryJobState {
 
 #[cfg(test)]
 mod test {
-    use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
-    use crate::cluster::test::{
-        test_executor_registration, test_fuzz_reservations, test_job_lifecycle,
-        test_job_planning_failure, test_reservation,
-    };
-    use crate::cluster::TaskDistribution;
+    use crate::cluster::memory::InMemoryJobState;
+    use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
     use crate::test_utils::{
         test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
     };
     use ballista_core::error::Result;
     use ballista_core::utils::default_session_builder;
 
-    #[tokio::test]
-    async fn test_in_memory_registration() -> Result<()> {
-        test_executor_registration(InMemoryClusterState::default()).await
-    }
-
-    #[tokio::test]
-    async fn test_in_memory_reserve() -> Result<()> {
-        test_reservation(InMemoryClusterState::default(), TaskDistribution::Bias).await?;
-        test_reservation(
-            InMemoryClusterState::default(),
-            TaskDistribution::RoundRobin,
-        )
-        .await?;
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_in_memory_fuzz_reserve() -> Result<()> {
-        test_fuzz_reservations(
-            InMemoryClusterState::default(),
-            10,
-            TaskDistribution::Bias,
-            10,
-            10,
-        )
-        .await?;
-        test_fuzz_reservations(
-            InMemoryClusterState::default(),
-            10,
-            TaskDistribution::RoundRobin,
-            10,
-            10,
-        )
-        .await?;
-
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_in_memory_job_lifecycle() -> Result<()> {
         test_job_lifecycle(
diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs
index 0d8ae900..87680ac0 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -22,7 +22,7 @@ pub mod storage;
 
 #[cfg(test)]
 #[allow(clippy::uninlined_format_args)]
-pub mod test;
+pub mod test_util;
 
 use crate::cluster::kv::KeyValueState;
 use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
@@ -31,20 +31,23 @@ use crate::cluster::storage::sled::SledClient;
 use crate::cluster::storage::KeyValueStore;
 use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
 use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
+use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
+use crate::state::task_manager::JobInfoCache;
 use ballista_core::config::BallistaConfig;
 use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::{AvailableTaskSlots, ExecutorHeartbeat, JobStatus};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use ballista_core::serde::protobuf::{
+    job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, PartitionId};
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::default_session_builder;
 use clap::ArgEnum;
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use futures::Stream;
-use log::info;
+use log::{debug, info, warn};
 use std::collections::{HashMap, HashSet};
 use std::fmt;
 use std::pin::Pin;
@@ -195,6 +198,13 @@ impl BallistaCluster {
 /// by any schedulers with a shared `ClusterState`
 pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = ExecutorHeartbeat> + Send>>;
 
+/// A task bound with an executor to execute.
+/// BoundTask.0 is the executor id; While BoundTask.1 is the task description.
+pub type BoundTask = (String, TaskDescription);
+
+/// ExecutorSlot.0 is the executor id; While ExecutorSlot.1 is for slot number.
+pub type ExecutorSlot = (String, u32);
+
 /// A trait that contains the necessary method to maintain a globally consistent view of cluster resources
 #[tonic::async_trait]
 pub trait ClusterState: Send + Sync + 'static {
@@ -203,45 +213,28 @@ pub trait ClusterState: Send + Sync + 'static {
         Ok(())
     }
 
-    /// Reserve up to `num_slots` executor task slots. If not enough task slots are available, reserve
-    /// as many as possible.
+    /// Bind the ready to running tasks from [`active_jobs`] with available executors.
     ///
-    /// If `executors` is provided, only reserve slots of the specified executor IDs
-    async fn reserve_slots(
+    /// If `executors` is provided, only bind slots from the specified executor IDs
+    async fn bind_schedulable_tasks(
         &self,
-        num_slots: u32,
         distribution: TaskDistribution,
+        active_jobs: Arc<HashMap<String, JobInfoCache>>,
         executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>>;
+    ) -> Result<Vec<BoundTask>>;
 
-    /// Reserve exactly `num_slots` executor task slots. If not enough task slots are available,
-    /// returns an empty vec
+    /// Unbind executor and task when a task finishes or fails. It will increase the executor
+    /// available task slots.
     ///
-    /// If `executors` is provided, only reserve slots of the specified executor IDs
-    async fn reserve_slots_exact(
-        &self,
-        num_slots: u32,
-        distribution: TaskDistribution,
-        executors: Option<HashSet<String>>,
-    ) -> Result<Vec<ExecutorReservation>>;
-
-    /// Cancel the specified reservations. This will make reserved executor slots available to other
-    /// tasks.
     /// This operations should be atomic. Either all reservations are cancelled or none are
-    async fn cancel_reservations(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()>;
+    async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()>;
 
-    /// Register a new executor in the cluster. If `reserve` is true, then the executors task slots
-    /// will be reserved and returned in the response and none of the new executors task slots will be
-    /// available to other tasks.
+    /// Register a new executor in the cluster.
     async fn register_executor(
         &self,
         metadata: ExecutorMetadata,
         spec: ExecutorData,
-        reserve: bool,
-    ) -> Result<Vec<ExecutorReservation>>;
+    ) -> Result<()>;
 
     /// Save the executor metadata. This will overwrite existing metadata for the executor ID
     async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()>;
@@ -309,12 +302,11 @@ pub trait JobState: Send + Sync {
     /// Accept job into  a scheduler's job queue. This should be called when a job is
     /// received by the scheduler but before it is planned and may or may not be saved
     /// in global state
-    async fn accept_job(
-        &self,
-        job_id: &str,
-        job_name: &str,
-        queued_at: u64,
-    ) -> Result<()>;
+    fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()>;
+
+    /// Get the number of queued jobs. If it's big, then it means the scheduler is too busy.
+    /// In normal case, it's better to be 0.
+    fn pending_job_number(&self) -> usize;
 
     /// Submit a new job to the `JobState`. It is assumed that the submitter owns the job.
     /// In local state the job should be save as `JobStatus::Active` and in shared state
@@ -376,66 +368,382 @@ pub trait JobState: Send + Sync {
     ) -> Result<Option<Arc<SessionContext>>>;
 }
 
-pub(crate) fn reserve_slots_bias(
+pub(crate) async fn bind_task_bias(
     mut slots: Vec<&mut AvailableTaskSlots>,
-    mut n: u32,
-) -> Vec<ExecutorReservation> {
-    let mut reservations = Vec::with_capacity(n as usize);
-
-    let mut iter = slots.iter_mut();
-
-    while n > 0 {
-        if let Some(executor) = iter.next() {
-            let take = executor.slots.min(n);
-            for _ in 0..take {
-                reservations
-                    .push(ExecutorReservation::new_free(executor.executor_id.clone()));
-            }
+    active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
+) -> Vec<BoundTask> {
+    let mut schedulable_tasks: Vec<BoundTask> = vec![];
+
+    let total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
+    if total_slots == 0 {
+        warn!("Not enough available executor slots for task running!!!");
+        return schedulable_tasks;
+    }
 
-            executor.slots -= take;
-            n -= take;
-        } else {
-            break;
+    // Sort the slots by descending order
+    slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
+
+    let mut idx_slot = 0usize;
+    let mut slot = &mut slots[idx_slot];
+    for (job_id, job_info) in active_jobs.iter() {
+        if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
+            debug!(
+                "Job {} is not in running status and will be skipped",
+                job_id
+            );
+            continue;
+        }
+        let mut graph = job_info.execution_graph.write().await;
+        let session_id = graph.session_id().to_string();
+        let mut black_list = vec![];
+        while let Some((running_stage, task_id_gen)) =
+            graph.fetch_running_stage(&black_list)
+        {
+            if if_skip(running_stage.plan.clone()) {
+                info!(
+                    "Will skip stage {}/{} for bias task binding",
+                    job_id, running_stage.stage_id
+                );
+                black_list.push(running_stage.stage_id);
+                continue;
+            }
+            // We are sure that it will at least bind one task by going through the following logic.
+            // It will not go into a dead loop.
+            let runnable_tasks = running_stage
+                .task_infos
+                .iter_mut()
+                .enumerate()
+                .filter(|(_partition, info)| info.is_none())
+                .take(total_slots as usize)
+                .collect::<Vec<_>>();
+            for (partition_id, task_info) in runnable_tasks {
+                // Assign [`slot`] with a slot available slot number larger than 0
+                while slot.slots == 0 {
+                    idx_slot += 1;
+                    if idx_slot >= slots.len() {
+                        return schedulable_tasks;
+                    }
+                    slot = &mut slots[idx_slot];
+                }
+                let executor_id = slot.executor_id.clone();
+                let task_id = *task_id_gen;
+                *task_id_gen += 1;
+                *task_info = Some(create_task_info(executor_id.clone(), task_id));
+
+                let partition = PartitionId {
+                    job_id: job_id.clone(),
+                    stage_id: running_stage.stage_id,
+                    partition_id,
+                };
+                let task_desc = TaskDescription {
+                    session_id: session_id.clone(),
+                    partition,
+                    stage_attempt_num: running_stage.stage_attempt_num,
+                    task_id,
+                    task_attempt: running_stage.task_failure_numbers[partition_id],
+                    plan: running_stage.plan.clone(),
+                };
+                schedulable_tasks.push((executor_id, task_desc));
+
+                slot.slots -= 1;
+            }
         }
     }
 
-    reservations
+    schedulable_tasks
 }
 
-pub(crate) fn reserve_slots_round_robin(
+pub(crate) async fn bind_task_round_robin(
     mut slots: Vec<&mut AvailableTaskSlots>,
-    mut n: u32,
-) -> Vec<ExecutorReservation> {
-    let mut reservations = Vec::with_capacity(n as usize);
+    active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
+) -> Vec<BoundTask> {
+    let mut schedulable_tasks: Vec<BoundTask> = vec![];
+
+    let mut total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
+    if total_slots == 0 {
+        warn!("Not enough available executor slots for task running!!!");
+        return schedulable_tasks;
+    }
+    info!("Total slot number is {}", total_slots);
+
+    // Sort the slots by descending order
+    slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
+
+    let mut idx_slot = 0usize;
+    for (job_id, job_info) in active_jobs.iter() {
+        if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
+            debug!(
+                "Job {} is not in running status and will be skipped",
+                job_id
+            );
+            continue;
+        }
+        let mut graph = job_info.execution_graph.write().await;
+        let session_id = graph.session_id().to_string();
+        let mut black_list = vec![];
+        while let Some((running_stage, task_id_gen)) =
+            graph.fetch_running_stage(&black_list)
+        {
+            if if_skip(running_stage.plan.clone()) {
+                info!(
+                    "Will skip stage {}/{} for round robin task binding",
+                    job_id, running_stage.stage_id
+                );
+                black_list.push(running_stage.stage_id);
+                continue;
+            }
+            // We are sure that it will at least bind one task by going through the following logic.
+            // It will not go into a dead loop.
+            let runnable_tasks = running_stage
+                .task_infos
+                .iter_mut()
+                .enumerate()
+                .filter(|(_partition, info)| info.is_none())
+                .take(total_slots as usize)
+                .collect::<Vec<_>>();
+            for (partition_id, task_info) in runnable_tasks {
+                // Move to the index which has available slots
+                if idx_slot >= slots.len() {
+                    idx_slot = 0;
+                }
+                if slots[idx_slot].slots == 0 {
+                    idx_slot = 0;
+                }
+                // Since the slots is a vector with descending order, and the total available slots is larger than 0,
+                // we are sure the available slot number at idx_slot is larger than 1
+                let mut slot = &mut slots[idx_slot];
+                let executor_id = slot.executor_id.clone();
+                let task_id = *task_id_gen;
+                *task_id_gen += 1;
+                *task_info = Some(create_task_info(executor_id.clone(), task_id));
+
+                let partition = PartitionId {
+                    job_id: job_id.clone(),
+                    stage_id: running_stage.stage_id,
+                    partition_id,
+                };
+                let task_desc = TaskDescription {
+                    session_id: session_id.clone(),
+                    partition,
+                    stage_attempt_num: running_stage.stage_attempt_num,
+                    task_id,
+                    task_attempt: running_stage.task_failure_numbers[partition_id],
+                    plan: running_stage.plan.clone(),
+                };
+                schedulable_tasks.push((executor_id, task_desc));
+
+                idx_slot += 1;
+                slot.slots -= 1;
+                total_slots -= 1;
+                if total_slots == 0 {
+                    return schedulable_tasks;
+                }
+            }
+        }
+    }
 
-    let mut last_updated_idx = 0usize;
+    schedulable_tasks
+}
 
-    loop {
-        let n_before = n;
-        for (idx, data) in slots.iter_mut().enumerate() {
-            if n == 0 {
-                break;
-            }
+#[cfg(test)]
+mod test {
+    use crate::cluster::{bind_task_bias, bind_task_round_robin, BoundTask};
+    use crate::state::execution_graph::ExecutionGraph;
+    use crate::state::task_manager::JobInfoCache;
+    use crate::test_utils::{mock_completed_task, test_aggregation_plan_with_job_id};
+    use ballista_core::error::Result;
+    use ballista_core::serde::protobuf::AvailableTaskSlots;
+    use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification};
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn test_bind_task_bias() -> Result<()> {
+        let active_jobs = mock_active_jobs().await?;
+        let mut available_slots = mock_available_slots();
+        let available_slots_ref: Vec<&mut AvailableTaskSlots> =
+            available_slots.iter_mut().collect();
+
+        let bound_tasks =
+            bind_task_bias(available_slots_ref, Arc::new(active_jobs), |_| false).await;
+        assert_eq!(9, bound_tasks.len());
+
+        let result = get_result(bound_tasks);
+
+        let mut expected = Vec::new();
+        {
+            let mut expected0 = HashMap::new();
+
+            let mut entry_a = HashMap::new();
+            entry_a.insert("executor_3".to_string(), 2);
+            let mut entry_b = HashMap::new();
+            entry_b.insert("executor_3".to_string(), 5);
+            entry_b.insert("executor_2".to_string(), 2);
+
+            expected0.insert("job_a".to_string(), entry_a);
+            expected0.insert("job_b".to_string(), entry_b);
+
+            expected.push(expected0);
+        }
+        {
+            let mut expected0 = HashMap::new();
 
-            // Since the vector is sorted in descending order,
-            // if finding one executor has not enough slots, the following will have not enough, either
-            if data.slots == 0 {
-                break;
-            }
+            let mut entry_b = HashMap::new();
+            entry_b.insert("executor_3".to_string(), 7);
+            let mut entry_a = HashMap::new();
+            entry_a.insert("executor_2".to_string(), 2);
 
-            reservations.push(ExecutorReservation::new_free(data.executor_id.clone()));
-            data.slots -= 1;
-            n -= 1;
+            expected0.insert("job_a".to_string(), entry_a);
+            expected0.insert("job_b".to_string(), entry_b);
 
-            if idx >= last_updated_idx {
-                last_updated_idx = idx + 1;
-            }
+            expected.push(expected0);
         }
 
-        if n_before == n {
-            break;
+        assert!(
+            expected.contains(&result),
+            "The result {:?} is not as expected {:?}",
+            result,
+            expected
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_bind_task_round_robin() -> Result<()> {
+        let active_jobs = mock_active_jobs().await?;
+        let mut available_slots = mock_available_slots();
+        let available_slots_ref: Vec<&mut AvailableTaskSlots> =
+            available_slots.iter_mut().collect();
+
+        let bound_tasks =
+            bind_task_round_robin(available_slots_ref, Arc::new(active_jobs), |_| false)
+                .await;
+        assert_eq!(9, bound_tasks.len());
+
+        let result = get_result(bound_tasks);
+
+        let mut expected = Vec::new();
+        {
+            let mut expected0 = HashMap::new();
+
+            let mut entry_a = HashMap::new();
+            entry_a.insert("executor_3".to_string(), 1);
+            entry_a.insert("executor_2".to_string(), 1);
+            let mut entry_b = HashMap::new();
+            entry_b.insert("executor_1".to_string(), 3);
+            entry_b.insert("executor_3".to_string(), 2);
+            entry_b.insert("executor_2".to_string(), 2);
+
+            expected0.insert("job_a".to_string(), entry_a);
+            expected0.insert("job_b".to_string(), entry_b);
+
+            expected.push(expected0);
+        }
+        {
+            let mut expected0 = HashMap::new();
+
+            let mut entry_b = HashMap::new();
+            entry_b.insert("executor_3".to_string(), 3);
+            entry_b.insert("executor_2".to_string(), 2);
+            entry_b.insert("executor_1".to_string(), 2);
+            let mut entry_a = HashMap::new();
+            entry_a.insert("executor_2".to_string(), 1);
+            entry_a.insert("executor_1".to_string(), 1);
+
+            expected0.insert("job_a".to_string(), entry_a);
+            expected0.insert("job_b".to_string(), entry_b);
+
+            expected.push(expected0);
         }
+
+        assert!(
+            expected.contains(&result),
+            "The result {:?} is not as expected {:?}",
+            result,
+            expected
+        );
+
+        Ok(())
     }
 
-    reservations
+    fn get_result(
+        bound_tasks: Vec<BoundTask>,
+    ) -> HashMap<String, HashMap<String, usize>> {
+        let mut result = HashMap::new();
+
+        for bound_task in bound_tasks {
+            let entry = result
+                .entry(bound_task.1.partition.job_id)
+                .or_insert_with(HashMap::new);
+            let n = entry.entry(bound_task.0).or_insert_with(|| 0);
+            *n += 1;
+        }
+
+        result
+    }
+
+    async fn mock_active_jobs() -> Result<HashMap<String, JobInfoCache>> {
+        let num_partition = 8usize;
+
+        let graph_a = mock_graph("job_a", num_partition, 2).await?;
+
+        let graph_b = mock_graph("job_b", num_partition, 7).await?;
+
+        let mut active_jobs = HashMap::new();
+        active_jobs.insert(graph_a.job_id().to_string(), JobInfoCache::new(graph_a));
+        active_jobs.insert(graph_b.job_id().to_string(), JobInfoCache::new(graph_b));
+
+        Ok(active_jobs)
+    }
+
+    async fn mock_graph(
+        job_id: &str,
+        num_partition: usize,
+        num_pending_task: usize,
+    ) -> Result<ExecutionGraph> {
+        let mut graph = test_aggregation_plan_with_job_id(num_partition, job_id).await;
+        let executor = ExecutorMetadata {
+            id: "executor_0".to_string(),
+            host: "localhost".to_string(),
+            port: 50051,
+            grpc_port: 50052,
+            specification: ExecutorSpecification { task_slots: 32 },
+        };
+
+        if let Some(task) = graph.pop_next_task(&executor.id)? {
+            let task_status = mock_completed_task(task, &executor.id);
+            graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+        }
+
+        graph.revive();
+
+        for _i in 0..num_partition - num_pending_task {
+            if let Some(task) = graph.pop_next_task(&executor.id)? {
+                let task_status = mock_completed_task(task, &executor.id);
+                graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+            }
+        }
+
+        Ok(graph)
+    }
+
+    fn mock_available_slots() -> Vec<AvailableTaskSlots> {
+        vec![
+            AvailableTaskSlots {
+                executor_id: "executor_1".to_string(),
+                slots: 3,
+            },
+            AvailableTaskSlots {
+                executor_id: "executor_2".to_string(),
+                slots: 5,
+            },
+            AvailableTaskSlots {
+                executor_id: "executor_3".to_string(),
+                slots: 7,
+            },
+        ]
+    }
 }
diff --git a/ballista/scheduler/src/cluster/storage/mod.rs b/ballista/scheduler/src/cluster/storage/mod.rs
index f4080468..6aa049c0 100644
--- a/ballista/scheduler/src/cluster/storage/mod.rs
+++ b/ballista/scheduler/src/cluster/storage/mod.rs
@@ -114,17 +114,6 @@ pub trait KeyValueStore: Send + Sync + Clone + 'static {
     async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
 }
 
-/// Method of distributing tasks to available executor slots
-#[derive(Debug, Clone, Copy)]
-pub enum TaskDistribution {
-    /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
-    /// as are currently available
-    Bias,
-    /// Distributed tasks evenely across executors. This will try and iterate through available executors
-    /// and assign one task to each executor until all tasks are assigned.
-    RoundRobin,
-}
-
 /// A Watch is a cancelable stream of put or delete events in the [StateBackendClient]
 #[async_trait]
 pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
diff --git a/ballista/scheduler/src/cluster/test/mod.rs b/ballista/scheduler/src/cluster/test/mod.rs
deleted file mode 100644
index b9056bfd..00000000
--- a/ballista/scheduler/src/cluster/test/mod.rs
+++ /dev/null
@@ -1,587 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cluster::{ClusterState, JobState, JobStateEvent, TaskDistribution};
-use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
-use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{executor_status, JobStatus};
-use ballista_core::serde::scheduler::{
-    ExecutorData, ExecutorMetadata, ExecutorSpecification,
-};
-use futures::StreamExt;
-use itertools::Itertools;
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::RwLock;
-
-pub struct ClusterStateTest<S: ClusterState> {
-    state: Arc<S>,
-    reservations: Vec<ExecutorReservation>,
-    total_task_slots: u32,
-}
-
-impl<S: ClusterState> ClusterStateTest<S> {
-    pub async fn new(state: S) -> Result<Self> {
-        Ok(Self {
-            state: Arc::new(state),
-            reservations: vec![],
-            total_task_slots: 0,
-        })
-    }
-
-    pub async fn register_executor(
-        mut self,
-        executor_id: &str,
-        task_slots: u32,
-    ) -> Result<Self> {
-        self.state
-            .register_executor(
-                ExecutorMetadata {
-                    id: executor_id.to_string(),
-                    host: executor_id.to_string(),
-                    port: 0,
-                    grpc_port: 0,
-                    specification: ExecutorSpecification { task_slots },
-                },
-                ExecutorData {
-                    executor_id: executor_id.to_string(),
-                    total_task_slots: task_slots,
-                    available_task_slots: task_slots,
-                },
-                false,
-            )
-            .await?;
-
-        self.total_task_slots += task_slots;
-
-        Ok(self)
-    }
-
-    pub async fn remove_executor(self, executor_id: &str) -> Result<Self> {
-        self.state.remove_executor(executor_id).await?;
-
-        Ok(self)
-    }
-
-    pub async fn assert_live_executor(
-        self,
-        executor_id: &str,
-        task_slots: u32,
-    ) -> Result<Self> {
-        let executor = self.state.get_executor_metadata(executor_id).await;
-        assert!(
-            executor.is_ok(),
-            "Metadata for executor {} not found in state",
-            executor_id
-        );
-        assert_eq!(
-            executor.unwrap().specification.task_slots,
-            task_slots,
-            "Unexpected number of task slots for executor"
-        );
-
-        // Heratbeat stream is async so wait up to 500ms for it to show up
-        await_condition(Duration::from_millis(50), 10, || {
-            let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or(
-                false,
-                |heartbeat| {
-                    matches!(
-                        heartbeat.status,
-                        Some(ballista_core::serde::generated::ballista::ExecutorStatus {
-                            status: Some(executor_status::Status::Active(_))
-                        })
-                    )
-                },
-            );
-
-            futures::future::ready(Ok(found_heartbeat))
-        })
-        .await?;
-
-        Ok(self)
-    }
-
-    pub async fn assert_dead_executor(self, executor_id: &str) -> Result<Self> {
-        // Heratbeat stream is async so wait up to 500ms for it to show up
-        await_condition(Duration::from_millis(50), 10, || {
-            let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or(
-                true,
-                |heartbeat| {
-                    matches!(
-                        heartbeat.status,
-                        Some(ballista_core::serde::generated::ballista::ExecutorStatus {
-                            status: Some(executor_status::Status::Dead(_))
-                        })
-                    )
-                },
-            );
-
-            futures::future::ready(Ok(found_heartbeat))
-        })
-        .await?;
-
-        Ok(self)
-    }
-
-    pub async fn try_reserve_slots(
-        mut self,
-        num_slots: u32,
-        distribution: TaskDistribution,
-        filter: Option<Vec<String>>,
-        exact: bool,
-    ) -> Result<Self> {
-        let filter = filter.map(|f| f.into_iter().collect::<HashSet<String>>());
-        let reservations = if exact {
-            self.state
-                .reserve_slots_exact(num_slots, distribution, filter)
-                .await?
-        } else {
-            self.state
-                .reserve_slots(num_slots, distribution, filter)
-                .await?
-        };
-
-        self.reservations.extend(reservations);
-
-        Ok(self)
-    }
-
-    pub async fn cancel_reservations(mut self, num_slots: usize) -> Result<Self> {
-        if self.reservations.len() < num_slots {
-            return Err(BallistaError::General(format!(
-                "Not enough reservations to cancel, expected {} but found {}",
-                num_slots,
-                self.reservations.len()
-            )));
-        }
-
-        let to_keep = self.reservations.split_off(num_slots);
-
-        self.state
-            .cancel_reservations(std::mem::take(&mut self.reservations))
-            .await?;
-
-        self.reservations = to_keep;
-
-        Ok(self)
-    }
-
-    pub fn assert_open_reservations(self, n: usize) -> Self {
-        assert_eq!(
-            self.reservations.len(),
-            n,
-            "Expectedt {} open reservations but found {}",
-            n,
-            self.reservations.len()
-        );
-        self
-    }
-
-    pub fn assert_open_reservations_with<F: Fn(&ExecutorReservation) -> bool>(
-        self,
-        n: usize,
-        predicate: F,
-    ) -> Self {
-        assert_eq!(
-            self.reservations.len(),
-            n,
-            "Expected {} open reservations but found {}",
-            n,
-            self.reservations.len()
-        );
-
-        for res in &self.reservations {
-            assert!(predicate(res), "Predicate failed on reservation {:?}", res);
-        }
-        self
-    }
-
-    pub async fn fuzz_reservation(
-        mut self,
-        concurrency: usize,
-        distribution: TaskDistribution,
-    ) -> Result<()> {
-        let (sender, mut receiver) = tokio::sync::mpsc::channel(1_000);
-
-        let total_slots = self.total_task_slots;
-        for _ in 0..concurrency {
-            let state = self.state.clone();
-            let sender_clone = sender.clone();
-            tokio::spawn(async move {
-                let mut open_reservations = vec![];
-                for i in 0..10 {
-                    if i % 2 == 0 {
-                        let to_reserve = rand::random::<u32>() % total_slots;
-
-                        let reservations = state
-                            .reserve_slots(to_reserve, distribution, None)
-                            .await
-                            .unwrap();
-
-                        sender_clone
-                            .send(FuzzEvent::Reserved(reservations.clone()))
-                            .await
-                            .unwrap();
-
-                        open_reservations = reservations;
-                    } else {
-                        state
-                            .cancel_reservations(open_reservations.clone())
-                            .await
-                            .unwrap();
-                        sender_clone
-                            .send(FuzzEvent::Cancelled(std::mem::take(
-                                &mut open_reservations,
-                            )))
-                            .await
-                            .unwrap();
-                    }
-                }
-            });
-        }
-
-        drop(sender);
-
-        while let Some(event) = receiver.recv().await {
-            match event {
-                FuzzEvent::Reserved(reservations) => {
-                    self.reservations.extend(reservations);
-                    assert!(
-                        self.reservations.len() <= total_slots as usize,
-                        "More than total number of slots was reserved"
-                    );
-                }
-                FuzzEvent::Cancelled(reservations) => {
-                    for res in reservations {
-                        let idx = self
-                            .reservations
-                            .iter()
-                            .find_position(|r| r.executor_id == res.executor_id);
-                        assert!(idx.is_some(), "Received invalid cancellation, not existing reservation for executor ID {}", res.executor_id);
-
-                        self.reservations.swap_remove(idx.unwrap().0);
-                    }
-                }
-            }
-        }
-
-        Ok(())
-    }
-}
-
-#[derive(Debug, Clone)]
-enum FuzzEvent {
-    Reserved(Vec<ExecutorReservation>),
-    Cancelled(Vec<ExecutorReservation>),
-}
-
-pub async fn test_fuzz_reservations<S: ClusterState>(
-    state: S,
-    concurrency: usize,
-    distribution: TaskDistribution,
-    num_executors: usize,
-    task_slots_per_executor: usize,
-) -> Result<()> {
-    let mut test = ClusterStateTest::new(state).await?;
-
-    for idx in 0..num_executors {
-        test = test
-            .register_executor(idx.to_string().as_str(), task_slots_per_executor as u32)
-            .await?;
-    }
-
-    test.fuzz_reservation(concurrency, distribution).await
-}
-
-pub async fn test_executor_registration<S: ClusterState>(state: S) -> Result<()> {
-    let test = ClusterStateTest::new(state).await?;
-
-    test.register_executor("1", 10)
-        .await?
-        .register_executor("2", 10)
-        .await?
-        .register_executor("3", 10)
-        .await?
-        .assert_live_executor("1", 10)
-        .await?
-        .assert_live_executor("2", 10)
-        .await?
-        .assert_live_executor("3", 10)
-        .await?
-        .remove_executor("1")
-        .await?
-        .assert_dead_executor("1")
-        .await?
-        .remove_executor("2")
-        .await?
-        .assert_dead_executor("2")
-        .await?
-        .remove_executor("3")
-        .await?
-        .assert_dead_executor("3")
-        .await?;
-
-    Ok(())
-}
-
-pub async fn test_reservation<S: ClusterState>(
-    state: S,
-    distribution: TaskDistribution,
-) -> Result<()> {
-    let test = ClusterStateTest::new(state).await?;
-
-    test.register_executor("1", 10)
-        .await?
-        .register_executor("2", 10)
-        .await?
-        .register_executor("3", 10)
-        .await?
-        .try_reserve_slots(10, distribution, None, false)
-        .await?
-        .assert_open_reservations(10)
-        .cancel_reservations(10)
-        .await?
-        .try_reserve_slots(30, distribution, None, false)
-        .await?
-        .assert_open_reservations(30)
-        .cancel_reservations(15)
-        .await?
-        .assert_open_reservations(15)
-        .try_reserve_slots(30, distribution, None, false)
-        .await?
-        .assert_open_reservations(30)
-        .cancel_reservations(30)
-        .await?
-        .assert_open_reservations(0)
-        .try_reserve_slots(50, distribution, None, false)
-        .await?
-        .assert_open_reservations(30)
-        .cancel_reservations(30)
-        .await?
-        .try_reserve_slots(20, distribution, Some(vec!["1".to_string()]), false)
-        .await?
-        .assert_open_reservations_with(10, |res| res.executor_id == "1")
-        .cancel_reservations(10)
-        .await?
-        .try_reserve_slots(
-            20,
-            distribution,
-            Some(vec!["2".to_string(), "3".to_string()]),
-            false,
-        )
-        .await?
-        .assert_open_reservations_with(20, |res| {
-            res.executor_id == "2" || res.executor_id == "3"
-        });
-
-    Ok(())
-}
-
-pub struct JobStateTest<S: JobState> {
-    state: Arc<S>,
-    events: Arc<RwLock<Vec<JobStateEvent>>>,
-}
-
-impl<S: JobState> JobStateTest<S> {
-    pub async fn new(state: S) -> Result<Self> {
-        let events = Arc::new(RwLock::new(vec![]));
-
-        let mut event_stream = state.job_state_events().await?;
-        let events_clone = events.clone();
-        tokio::spawn(async move {
-            while let Some(event) = event_stream.next().await {
-                let mut guard = events_clone.write().await;
-
-                guard.push(event);
-            }
-        });
-
-        Ok(Self {
-            state: Arc::new(state),
-            events,
-        })
-    }
-
-    pub async fn queue_job(self, job_id: &str) -> Result<Self> {
-        self.state
-            .accept_job(job_id, "", timestamp_millis())
-            .await?;
-        Ok(self)
-    }
-
-    pub async fn fail_planning(self, job_id: &str) -> Result<Self> {
-        self.state
-            .fail_unscheduled_job(job_id, "failed planning".to_string())
-            .await?;
-        Ok(self)
-    }
-
-    pub async fn assert_queued(self, job_id: &str) -> Result<Self> {
-        let status = self.state.get_job_status(job_id).await?;
-
-        assert!(status.is_some(), "Queued job {} not found", job_id);
-
-        let status = status.unwrap();
-        assert!(
-            matches!(&status, JobStatus {
-            job_id: status_job_id, status: Some(Status::Queued(_)), ..
-        } if status_job_id.as_str() == job_id),
-            "Expected queued status but found {:?}",
-            status
-        );
-
-        Ok(self)
-    }
-
-    pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
-        self.state
-            .submit_job(graph.job_id().to_string(), graph)
-            .await?;
-        Ok(self)
-    }
-
-    pub async fn assert_job_running(self, job_id: &str) -> Result<Self> {
-        let status = self.state.get_job_status(job_id).await?;
-
-        assert!(status.is_some(), "Job status not found for {}", job_id);
-
-        let status = status.unwrap();
-        assert!(
-            matches!(&status, JobStatus {
-            job_id: status_job_id, status: Some(Status::Running(_)), ..
-        } if status_job_id.as_str() == job_id),
-            "Expected running status but found {:?}",
-            status
-        );
-
-        Ok(self)
-    }
-
-    pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
-        self.state.save_job(graph.job_id(), graph).await?;
-        Ok(self)
-    }
-
-    pub async fn assert_job_failed(self, job_id: &str) -> Result<Self> {
-        let status = self.state.get_job_status(job_id).await?;
-
-        assert!(status.is_some(), "Job status not found for {}", job_id);
-
-        let status = status.unwrap();
-        assert!(
-            matches!(&status, JobStatus {
-            job_id: status_job_id, status: Some(Status::Failed(_)), ..
-        } if status_job_id.as_str() == job_id),
-            "Expected failed status but found {:?}",
-            status
-        );
-
-        Ok(self)
-    }
-
-    pub async fn assert_job_successful(self, job_id: &str) -> Result<Self> {
-        let status = self.state.get_job_status(job_id).await?;
-
-        assert!(status.is_some(), "Job status not found for {}", job_id);
-        let status = status.unwrap();
-        assert!(
-            matches!(&status, JobStatus {
-            job_id: status_job_id, status: Some(Status::Successful(_)), ..
-        } if status_job_id.as_str() == job_id),
-            "Expected success status but found {:?}",
-            status
-        );
-
-        Ok(self)
-    }
-
-    pub async fn assert_event(self, event: JobStateEvent) -> Result<Self> {
-        let events = self.events.clone();
-        let found = await_condition(Duration::from_millis(50), 10, || async {
-            let guard = events.read().await;
-
-            Ok(guard.iter().any(|ev| ev == &event))
-        })
-        .await?;
-
-        assert!(found, "Expected event {:?}", event);
-
-        Ok(self)
-    }
-}
-
-pub async fn test_job_lifecycle<S: JobState>(
-    state: S,
-    mut graph: ExecutionGraph,
-) -> Result<()> {
-    let test = JobStateTest::new(state).await?;
-
-    let job_id = graph.job_id().to_string();
-
-    let test = test
-        .queue_job(&job_id)
-        .await?
-        .assert_queued(&job_id)
-        .await?
-        .submit_job(&graph)
-        .await?
-        .assert_job_running(&job_id)
-        .await?;
-
-    drain_tasks(&mut graph)?;
-    graph.succeed_job()?;
-
-    test.update_job(&graph)
-        .await?
-        .assert_job_successful(&job_id)
-        .await?;
-
-    Ok(())
-}
-
-pub async fn test_job_planning_failure<S: JobState>(
-    state: S,
-    graph: ExecutionGraph,
-) -> Result<()> {
-    let test = JobStateTest::new(state).await?;
-
-    let job_id = graph.job_id().to_string();
-
-    test.queue_job(&job_id)
-        .await?
-        .fail_planning(&job_id)
-        .await?
-        .assert_job_failed(&job_id)
-        .await?;
-
-    Ok(())
-}
-
-fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
-    let executor = mock_executor("executor-id1".to_string());
-    while let Some(task) = graph.pop_next_task(&executor.id)? {
-        let task_status = mock_completed_task(task, &executor.id);
-        graph.update_task_status(&executor, vec![task_status], 1, 1)?;
-    }
-
-    Ok(())
-}
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs b/ballista/scheduler/src/cluster/test_util/mod.rs
new file mode 100644
index 00000000..64da6c60
--- /dev/null
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cluster::{JobState, JobStateEvent};
+use crate::scheduler_server::timestamp_millis;
+use crate::state::execution_graph::ExecutionGraph;
+use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::job_status::Status;
+use ballista_core::serde::protobuf::JobStatus;
+use futures::StreamExt;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::RwLock;
+
+pub struct JobStateTest<S: JobState> {
+    state: Arc<S>,
+    events: Arc<RwLock<Vec<JobStateEvent>>>,
+}
+
+impl<S: JobState> JobStateTest<S> {
+    pub async fn new(state: S) -> Result<Self> {
+        let events = Arc::new(RwLock::new(vec![]));
+
+        let mut event_stream = state.job_state_events().await?;
+        let events_clone = events.clone();
+        tokio::spawn(async move {
+            while let Some(event) = event_stream.next().await {
+                let mut guard = events_clone.write().await;
+
+                guard.push(event);
+            }
+        });
+
+        Ok(Self {
+            state: Arc::new(state),
+            events,
+        })
+    }
+
+    pub fn queue_job(self, job_id: &str) -> Result<Self> {
+        self.state.accept_job(job_id, "", timestamp_millis())?;
+        Ok(self)
+    }
+
+    pub async fn fail_planning(self, job_id: &str) -> Result<Self> {
+        self.state
+            .fail_unscheduled_job(job_id, "failed planning".to_string())
+            .await?;
+        Ok(self)
+    }
+
+    pub async fn assert_queued(self, job_id: &str) -> Result<Self> {
+        let status = self.state.get_job_status(job_id).await?;
+
+        assert!(status.is_some(), "Queued job {} not found", job_id);
+
+        let status = status.unwrap();
+        assert!(
+            matches!(&status, JobStatus {
+            job_id: status_job_id, status: Some(Status::Queued(_)), ..
+        } if status_job_id.as_str() == job_id),
+            "Expected queued status but found {:?}",
+            status
+        );
+
+        Ok(self)
+    }
+
+    pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
+        self.state
+            .submit_job(graph.job_id().to_string(), graph)
+            .await?;
+        Ok(self)
+    }
+
+    pub async fn assert_job_running(self, job_id: &str) -> Result<Self> {
+        let status = self.state.get_job_status(job_id).await?;
+
+        assert!(status.is_some(), "Job status not found for {}", job_id);
+
+        let status = status.unwrap();
+        assert!(
+            matches!(&status, JobStatus {
+            job_id: status_job_id, status: Some(Status::Running(_)), ..
+        } if status_job_id.as_str() == job_id),
+            "Expected running status but found {:?}",
+            status
+        );
+
+        Ok(self)
+    }
+
+    pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
+        self.state.save_job(graph.job_id(), graph).await?;
+        Ok(self)
+    }
+
+    pub async fn assert_job_failed(self, job_id: &str) -> Result<Self> {
+        let status = self.state.get_job_status(job_id).await?;
+
+        assert!(status.is_some(), "Job status not found for {}", job_id);
+
+        let status = status.unwrap();
+        assert!(
+            matches!(&status, JobStatus {
+            job_id: status_job_id, status: Some(Status::Failed(_)), ..
+        } if status_job_id.as_str() == job_id),
+            "Expected failed status but found {:?}",
+            status
+        );
+
+        Ok(self)
+    }
+
+    pub async fn assert_job_successful(self, job_id: &str) -> Result<Self> {
+        let status = self.state.get_job_status(job_id).await?;
+
+        assert!(status.is_some(), "Job status not found for {}", job_id);
+        let status = status.unwrap();
+        assert!(
+            matches!(&status, JobStatus {
+            job_id: status_job_id, status: Some(Status::Successful(_)), ..
+        } if status_job_id.as_str() == job_id),
+            "Expected success status but found {:?}",
+            status
+        );
+
+        Ok(self)
+    }
+
+    pub async fn assert_event(self, event: JobStateEvent) -> Result<Self> {
+        let events = self.events.clone();
+        let found = await_condition(Duration::from_millis(50), 10, || async {
+            let guard = events.read().await;
+
+            Ok(guard.iter().any(|ev| ev == &event))
+        })
+        .await?;
+
+        assert!(found, "Expected event {:?}", event);
+
+        Ok(self)
+    }
+}
+
+pub async fn test_job_lifecycle<S: JobState>(
+    state: S,
+    mut graph: ExecutionGraph,
+) -> Result<()> {
+    let test = JobStateTest::new(state).await?;
+
+    let job_id = graph.job_id().to_string();
+
+    let test = test
+        .queue_job(&job_id)?
+        .assert_queued(&job_id)
+        .await?
+        .submit_job(&graph)
+        .await?
+        .assert_job_running(&job_id)
+        .await?;
+
+    drain_tasks(&mut graph)?;
+    graph.succeed_job()?;
+
+    test.update_job(&graph)
+        .await?
+        .assert_job_successful(&job_id)
+        .await?;
+
+    Ok(())
+}
+
+pub async fn test_job_planning_failure<S: JobState>(
+    state: S,
+    graph: ExecutionGraph,
+) -> Result<()> {
+    let test = JobStateTest::new(state).await?;
+
+    let job_id = graph.job_id().to_string();
+
+    test.queue_job(&job_id)?
+        .fail_planning(&job_id)
+        .await?
+        .assert_job_failed(&job_id)
+        .await?;
+
+    Ok(())
+}
+
+fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+    let executor = mock_executor("executor-id1".to_string());
+    while let Some(task) = graph.pop_next_task(&executor.id)? {
+        let task_status = mock_completed_task(task, &executor.id);
+        graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+    }
+
+    Ok(())
+}
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index ed828ec0..99b938a1 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -15,14 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::state::executor_manager::ExecutorReservation;
 use std::fmt::{Debug, Formatter};
 
 use datafusion::logical_expr::LogicalPlan;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::TaskStatus;
-use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use std::sync::Arc;
 
@@ -37,12 +35,8 @@ pub enum QueryStageSchedulerEvent {
     },
     JobSubmitted {
         job_id: String,
-        job_name: String,
-        session_id: String,
         queued_at: u64,
         submitted_at: u64,
-        resubmit: bool,
-        plan: Arc<dyn ExecutionPlan>,
     },
     // For a job which failed during planning
     JobPlanningFailed {
@@ -67,7 +61,7 @@ pub enum QueryStageSchedulerEvent {
     JobCancel(String),
     JobDataClean(String),
     TaskUpdating(String, Vec<TaskStatus>),
-    ReservationOffering(Vec<ExecutorReservation>),
+    ReviveOffers,
     ExecutorLost(String, Option<String>),
     CancelTasks(Vec<RunningTaskInfo>),
 }
@@ -80,10 +74,8 @@ impl Debug for QueryStageSchedulerEvent {
             } => {
                 write!(f, "JobQueued : job_id={job_id}, job_name={job_name}.")
             }
-            QueryStageSchedulerEvent::JobSubmitted {
-                job_id, resubmit, ..
-            } => {
-                write!(f, "JobSubmitted : job_id={job_id}, resubmit={resubmit}.")
+            QueryStageSchedulerEvent::JobSubmitted { job_id, .. } => {
+                write!(f, "JobSubmitted : job_id={job_id}.")
             }
             QueryStageSchedulerEvent::JobPlanningFailed {
                 job_id,
@@ -129,8 +121,8 @@ impl Debug for QueryStageSchedulerEvent {
             QueryStageSchedulerEvent::TaskUpdating(job_id, status) => {
                 write!(f, "TaskUpdating : job_id={job_id}, status:[{status:?}].")
             }
-            QueryStageSchedulerEvent::ReservationOffering(reservations) => {
-                write!(f, "ReservationOffering : reservations:[{reservations:?}].")
+            QueryStageSchedulerEvent::ReviveOffers => {
+                write!(f, "ReviveOffers.")
             }
             QueryStageSchedulerEvent::ExecutorLost(job_id, reason) => {
                 write!(f, "ExecutorLost : job_id={job_id}, reason:[{reason:?}].")
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index 104037e2..f2feb243 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -25,7 +25,8 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
 
 use tonic::{Request, Response};
 
-const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
+const PENDING_JOBS_METRIC_NAME: &str = "pending_jobs";
+const RUNNING_JOBS_METRIC_NAME: &str = "running_jobs";
 
 #[tonic::async_trait]
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
@@ -44,8 +45,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
     ) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
         Ok(Response::new(GetMetricSpecResponse {
             metric_specs: vec![MetricSpec {
-                metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
-                target_size: 1,
+                metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
+                target_size: 0,
             }],
         }))
     }
@@ -55,10 +56,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
         _request: Request<GetMetricsRequest>,
     ) -> Result<Response<GetMetricsResponse>, tonic::Status> {
         Ok(Response::new(GetMetricsResponse {
-            metric_values: vec![MetricValue {
-                metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
-                metric_value: self.pending_tasks() as i64,
-            }],
+            metric_values: vec![
+                MetricValue {
+                    metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
+                    metric_value: self.pending_job_number() as i64,
+                },
+                MetricValue {
+                    metric_name: RUNNING_JOBS_METRIC_NAME.to_string(),
+                    metric_value: self.running_job_number() as i64,
+                },
+            ],
         }))
     }
 }
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 485afa8c..f1230c71 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -23,15 +23,16 @@ use std::convert::TryInto;
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
-    execute_query_failure_result, execute_query_result, CancelJobParams, CancelJobResult,
-    CleanJobDataParams, CleanJobDataResult, CreateSessionParams, CreateSessionResult,
-    ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult,
-    ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams,
-    ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
-    GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
-    PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
-    RemoveSessionParams, RemoveSessionResult, UpdateSessionParams, UpdateSessionResult,
-    UpdateTaskStatusParams, UpdateTaskStatusResult,
+    execute_query_failure_result, execute_query_result, AvailableTaskSlots,
+    CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
+    CreateSessionParams, CreateSessionResult, ExecuteQueryFailureResult,
+    ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat,
+    ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
+    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+    HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+    RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult,
+    UpdateSessionParams, UpdateSessionResult, UpdateTaskStatusParams,
+    UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
 
@@ -46,13 +47,14 @@ use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
 use std::ops::Deref;
 use std::sync::Arc;
 
+use crate::cluster::{bind_task_bias, bind_task_round_robin};
+use crate::config::TaskDistribution;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use datafusion::prelude::SessionContext;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
 use crate::scheduler_server::SchedulerServer;
-use crate::state::executor_manager::ExecutorReservation;
 
 #[tonic::async_trait]
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
@@ -76,63 +78,69 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         } = request.into_inner()
         {
             trace!("Received poll_work request for {:?}", metadata);
-            let metadata = ExecutorMetadata {
-                id: metadata.id,
-                host: metadata
-                    .optional_host
-                    .map(|h| match h {
-                        OptionalHost::Host(host) => host,
-                    })
-                    .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
-                port: metadata.port as u16,
-                grpc_port: metadata.grpc_port as u16,
-                specification: metadata.specification.unwrap().into(),
-            };
+            let executor_id = metadata.id.clone();
 
-            self.state
-                .executor_manager
-                .save_executor_metadata(metadata.clone())
-                .await
-                .map_err(|e| {
-                    let msg = format!("Could not save executor metadata: {e}");
-                    error!("{}", msg);
-                    Status::internal(msg)
-                })?;
+            // It's not necessary.
+            // It's only for the scheduler to have a picture of the whole executor cluster.
+            {
+                let metadata = ExecutorMetadata {
+                    id: metadata.id,
+                    host: metadata
+                        .optional_host
+                        .map(|h| match h {
+                            OptionalHost::Host(host) => host,
+                        })
+                        .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
+                    port: metadata.port as u16,
+                    grpc_port: metadata.grpc_port as u16,
+                    specification: metadata.specification.unwrap().into(),
+                };
+                if let Err(e) = self
+                    .state
+                    .executor_manager
+                    .save_executor_metadata(metadata)
+                    .await
+                {
+                    warn!("Could not save executor metadata: {:?}", e);
+                }
+            }
 
-            self.update_task_status(&metadata.id, task_status)
+            self.update_task_status(&executor_id, task_status)
                 .await
                 .map_err(|e| {
                     let msg = format!(
                         "Fail to update tasks status from executor {:?} due to {:?}",
-                        &metadata.id, e
+                        &executor_id, e
                     );
                     error!("{}", msg);
                     Status::internal(msg)
                 })?;
 
-            // Find `num_free_slots` next tasks when available
-            let mut next_tasks = vec![];
-            let reservations = vec![
-                ExecutorReservation::new_free(metadata.id.clone());
-                num_free_slots as usize
-            ];
-            if let Ok((mut assignments, _, _)) = self
-                .state
-                .task_manager
-                .fill_reservations(&reservations)
-                .await
-            {
-                while let Some((_, task)) = assignments.pop() {
-                    match self.state.task_manager.prepare_task_definition(task) {
-                        Ok(task_definition) => next_tasks.push(task_definition),
-                        Err(e) => {
-                            error!("Error preparing task definition: {:?}", e);
-                        }
+            let mut available_slots = vec![AvailableTaskSlots {
+                executor_id,
+                slots: num_free_slots,
+            }];
+            let available_slots = available_slots.iter_mut().collect();
+            let active_jobs = self.state.task_manager.get_running_job_cache();
+            let schedulable_tasks = match self.state.config.task_distribution {
+                TaskDistribution::Bias => {
+                    bind_task_bias(available_slots, active_jobs, |_| false).await
+                }
+                TaskDistribution::RoundRobin => {
+                    bind_task_round_robin(available_slots, active_jobs, |_| false).await
+                }
+            };
+
+            let mut tasks = vec![];
+            for (_, task) in schedulable_tasks {
+                match self.state.task_manager.prepare_task_definition(task) {
+                    Ok(task_definition) => tasks.push(task_definition),
+                    Err(e) => {
+                        error!("Error preparing task definition: {:?}", e);
                     }
                 }
             }
-
-            Ok(Response::new(PollWorkResult { tasks: next_tasks }))
+            Ok(Response::new(PollWorkResult { tasks }))
         } else {
             warn!("Received invalid executor poll_work request");
             Err(Status::invalid_argument("Missing metadata in request"))
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 74bf3359..25fa4296 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -20,7 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
 use ballista_core::error::Result;
 use ballista_core::event_loop::{EventLoop, EventSender};
-use ballista_core::serde::protobuf::{StopExecutorParams, TaskStatus};
+use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
 
 use datafusion::execution::context::SessionState;
@@ -38,7 +38,7 @@ use log::{error, warn};
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
 
 use crate::state::task_manager::TaskLauncher;
 use crate::state::SchedulerState;
@@ -146,13 +146,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         Ok(())
     }
 
-    #[cfg(test)]
-    pub(crate) fn query_stage_scheduler(&self) -> Arc<QueryStageScheduler<T, U>> {
-        self.query_stage_scheduler.clone()
+    pub fn pending_job_number(&self) -> usize {
+        self.state.task_manager.pending_job_number()
     }
 
-    pub(crate) fn pending_tasks(&self) -> usize {
-        self.query_stage_scheduler.pending_tasks()
+    pub fn running_job_number(&self) -> usize {
+        self.state.task_manager.running_job_number()
     }
 
     pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
@@ -204,13 +203,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             .await
     }
 
-    pub(crate) async fn offer_reservation(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
+    pub(crate) async fn revive_offers(&self) -> Result<()> {
         self.query_stage_event_loop
             .get_sender()?
-            .post_event(QueryStageSchedulerEvent::ReservationOffering(reservations))
+            .post_event(QueryStageSchedulerEvent::ReviveOffers)
             .await
     }
 
@@ -224,7 +220,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                 let expired_executors = state.executor_manager.get_expired_executors();
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
-                    let executor_manager = state.executor_manager.clone();
 
                     let sender_clone = event_sender.clone();
 
@@ -251,7 +246,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
 
                     // If executor is expired, remove it immediately
                     Self::remove_executor(
-                        executor_manager,
+                        state.executor_manager.clone(),
                         sender_clone,
                         &executor_id,
                         Some(stop_reason.clone()),
@@ -261,31 +256,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                     // If executor is not already terminating then stop it. If it is terminating then it should already be shutting
                     // down and we do not need to do anything here.
                     if !terminating {
-                        match state.executor_manager.get_client(&executor_id).await {
-                            Ok(mut client) => {
-                                tokio::task::spawn(async move {
-                                    match client
-                                        .stop_executor(StopExecutorParams {
-                                            executor_id,
-                                            reason: stop_reason,
-                                            force: true,
-                                        })
-                                        .await
-                                    {
-                                        Err(error) => {
-                                            warn!(
-                                            "Failed to send stop_executor rpc due to, {}",
-                                            error
-                                        );
-                                        }
-                                        Ok(_value) => {}
-                                    }
-                                });
-                            }
-                            Err(_) => {
-                                warn!("Executor is already dead, failed to connect to Executor {}", executor_id);
-                            }
-                        }
+                        state
+                            .executor_manager
+                            .stop_executor(&executor_id, stop_reason)
+                            .await;
                     }
                 }
                 tokio::time::sleep(Duration::from_secs(
@@ -334,16 +308,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         };
 
         // Save the executor to state
-        let reservations = self
-            .state
+        self.state
             .executor_manager
-            .register_executor(metadata, executor_data, false)
+            .register_executor(metadata, executor_data)
             .await?;
 
         // If we are using push-based scheduling then reserve this executors slots and send
         // them for scheduling tasks.
         if self.state.config.is_push_staged_scheduling() {
-            self.offer_reservation(reservations).await?;
+            self.revive_offers().await?;
         }
 
         Ok(())
@@ -412,7 +385,7 @@ mod test {
             scheduler
                 .state
                 .executor_manager
-                .register_executor(executor_metadata, executor_data, false)
+                .register_executor(executor_metadata, executor_data)
                 .await?;
         }
 
@@ -430,23 +403,15 @@ mod test {
         scheduler
             .state
             .task_manager
-            .queue_job(job_id, "", timestamp_millis())
-            .await?;
+            .queue_job(job_id, "", timestamp_millis())?;
 
-        // Plan job
-        let plan = scheduler
+        // Submit job
+        scheduler
             .state
-            .plan_job(job_id, ctx.clone(), &plan)
+            .submit_job(job_id, "", ctx, &plan, 0)
             .await
             .expect("submitting plan");
 
-        //Submit job plan
-        scheduler
-            .state
-            .task_manager
-            .submit_job(job_id, "", &ctx.session_id(), plan, 0)
-            .await?;
-
         // Refresh the ExecutionGraph
         while let Some(graph) = scheduler
             .state
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 1cd611b8..138f7090 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -35,7 +34,6 @@ use tokio::time::Instant;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
-use crate::state::executor_manager::ExecutorReservation;
 use crate::state::SchedulerState;
 
 pub(crate) struct QueryStageScheduler<
@@ -44,7 +42,6 @@ pub(crate) struct QueryStageScheduler<
 > {
     state: Arc<SchedulerState<T, U>>,
     metrics_collector: Arc<dyn SchedulerMetricsCollector>,
-    pending_tasks: AtomicUsize,
     config: Arc<SchedulerConfig>,
 }
 
@@ -57,21 +54,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
         Self {
             state,
             metrics_collector,
-            pending_tasks: AtomicUsize::default(),
             config,
         }
     }
 
-    pub(crate) fn set_pending_tasks(&self, tasks: usize) {
-        self.pending_tasks.store(tasks, Ordering::SeqCst);
-        self.metrics_collector
-            .set_pending_tasks_queue_size(tasks as u64);
-    }
-
-    pub(crate) fn pending_tasks(&self) -> usize {
-        self.pending_tasks.load(Ordering::SeqCst)
-    }
-
     pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
         self.metrics_collector.as_ref()
     }
@@ -99,7 +85,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         if self.config.scheduler_event_expected_processing_duration > 0 {
             time_recorder = Some((Instant::now(), event.clone()));
         };
-        let tx_event = EventSender::new(tx_event.clone());
+        let event_sender = EventSender::new(tx_event.clone());
         match event {
             QueryStageSchedulerEvent::JobQueued {
                 job_id,
@@ -110,127 +96,55 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
             } => {
                 info!("Job {} queued with name {:?}", job_id, job_name);
 
-                self.state
+                if let Err(e) = self
+                    .state
                     .task_manager
                     .queue_job(&job_id, &job_name, queued_at)
-                    .await?;
+                {
+                    error!("Fail to queue job {} due to {:?}", job_id, e);
+                    return Ok(());
+                }
 
                 let state = self.state.clone();
                 tokio::spawn(async move {
-                    let event =
-                        match state.plan_job(&job_id, session_ctx.clone(), &plan).await {
-                            Ok(plan) => QueryStageSchedulerEvent::JobSubmitted {
-                                job_id,
-                                job_name,
-                                session_id: session_ctx.session_id(),
-                                queued_at,
-                                submitted_at: timestamp_millis(),
-                                resubmit: false,
-                                plan,
-                            },
-                            Err(error) => {
-                                let fail_message =
-                                    format!("Error planning job {job_id}: {error:?}");
-                                error!("{}", &fail_message);
-                                QueryStageSchedulerEvent::JobPlanningFailed {
-                                    job_id,
-                                    fail_message,
-                                    queued_at,
-                                    failed_at: timestamp_millis(),
-                                }
-                            }
-                        };
-                    if let Err(e) = tx_event.post_event(event).await {
+                    let event = if let Err(e) = state
+                        .submit_job(&job_id, &job_name, session_ctx, &plan, queued_at)
+                        .await
+                    {
+                        let fail_message = format!("Error planning job {job_id}: {e:?}");
+                        error!("{}", &fail_message);
+                        QueryStageSchedulerEvent::JobPlanningFailed {
+                            job_id,
+                            fail_message,
+                            queued_at,
+                            failed_at: timestamp_millis(),
+                        }
+                    } else {
+                        QueryStageSchedulerEvent::JobSubmitted {
+                            job_id,
+                            queued_at,
+                            submitted_at: timestamp_millis(),
+                        }
+                    };
+                    if let Err(e) = event_sender.post_event(event).await {
                         error!("Fail to send event due to {}", e);
                     }
                 });
             }
             QueryStageSchedulerEvent::JobSubmitted {
                 job_id,
-                job_name,
-                session_id,
                 queued_at,
                 submitted_at,
-                resubmit,
-                plan,
             } => {
-                if !resubmit {
-                    self.metrics_collector.record_submitted(
-                        &job_id,
-                        queued_at,
-                        submitted_at,
-                    );
-                    self.state
-                        .task_manager
-                        .submit_job(
-                            job_id.as_str(),
-                            job_name.as_str(),
-                            session_id.as_str(),
-                            plan.clone(),
-                            queued_at,
-                        )
-                        .await?;
-                    info!("Job {} submitted", job_id);
-                } else {
-                    debug!("Job {} resubmitted", job_id);
-                }
+                self.metrics_collector
+                    .record_submitted(&job_id, queued_at, submitted_at);
+
+                info!("Job {} submitted", job_id);
 
                 if self.state.config.is_push_staged_scheduling() {
-                    let available_tasks = self
-                        .state
-                        .task_manager
-                        .get_available_task_count(&job_id)
+                    event_sender
+                        .post_event(QueryStageSchedulerEvent::ReviveOffers)
                         .await?;
-
-                    let reservations: Vec<ExecutorReservation> = self
-                        .state
-                        .executor_manager
-                        .reserve_slots(available_tasks as u32)
-                        .await?
-                        .into_iter()
-                        .map(|res| res.assign(job_id.clone()))
-                        .collect();
-
-                    if reservations.is_empty()
-                        && self.config.job_resubmit_interval_ms.is_some()
-                    {
-                        let wait_ms = self.config.job_resubmit_interval_ms.unwrap();
-
-                        debug!(
-                            "No task slots reserved for job {job_id}, resubmitting after {wait_ms}ms"
-                        );
-
-                        tokio::task::spawn(async move {
-                            tokio::time::sleep(Duration::from_millis(wait_ms)).await;
-
-                            if let Err(e) = tx_event
-                                .post_event(QueryStageSchedulerEvent::JobSubmitted {
-                                    job_id,
-                                    job_name,
-                                    session_id,
-                                    queued_at,
-                                    submitted_at,
-                                    resubmit: true,
-                                    plan: plan.clone(),
-                                })
-                                .await
-                            {
-                                error!("error resubmitting job: {}", e);
-                            }
-                        });
-                    } else {
-                        debug!(
-                            "Reserved {} task slots for submitted job {}",
-                            reservations.len(),
-                            job_id
-                        );
-
-                        tx_event
-                            .post_event(QueryStageSchedulerEvent::ReservationOffering(
-                                reservations,
-                            ))
-                            .await?;
-                    }
                 }
             }
             QueryStageSchedulerEvent::JobPlanningFailed {
@@ -243,10 +157,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     .record_failed(&job_id, queued_at, failed_at);
 
                 error!("Job {} failed: {}", job_id, fail_message);
-                self.state
+                if let Err(e) = self
+                    .state
                     .task_manager
                     .fail_unscheduled_job(&job_id, fail_message)
-                    .await?;
+                    .await
+                {
+                    error!(
+                        "Fail to invoke fail_unscheduled_job for job {} due to {:?}",
+                        job_id, e
+                    );
+                }
             }
             QueryStageSchedulerEvent::JobFinished {
                 job_id,
@@ -257,7 +178,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     .record_completed(&job_id, queued_at, completed_at);
 
                 info!("Job {} success", job_id);
-                self.state.task_manager.succeed_job(&job_id).await?;
+                if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
+                    error!(
+                        "Fail to invoke succeed_job for job {} due to {:?}",
+                        job_id, e
+                    );
+                }
                 self.state.clean_up_successful_job(job_id);
             }
             QueryStageSchedulerEvent::JobRunningFailed {
@@ -270,34 +196,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     .record_failed(&job_id, queued_at, failed_at);
 
                 error!("Job {} running failed", job_id);
-                let (running_tasks, _pending_tasks) = self
+                match self
                     .state
                     .task_manager
                     .abort_job(&job_id, fail_message)
-                    .await?;
-
-                if !running_tasks.is_empty() {
-                    tx_event
-                        .post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
-                        .await?;
+                    .await
+                {
+                    Ok((running_tasks, _pending_tasks)) => {
+                        if !running_tasks.is_empty() {
+                            event_sender
+                                .post_event(QueryStageSchedulerEvent::CancelTasks(
+                                    running_tasks,
+                                ))
+                                .await?;
+                        }
+                    }
+                    Err(e) => {
+                        error!(
+                            "Fail to invoke abort_job for job {} due to {:?}",
+                            job_id, e
+                        );
+                    }
                 }
                 self.state.clean_up_failed_job(job_id);
             }
             QueryStageSchedulerEvent::JobUpdated(job_id) => {
                 info!("Job {} Updated", job_id);
-                self.state.task_manager.update_job(&job_id).await?;
+                if let Err(e) = self.state.task_manager.update_job(&job_id).await {
+                    error!(
+                        "Fail to invoke update_job for job {} due to {:?}",
+                        job_id, e
+                    );
+                }
             }
             QueryStageSchedulerEvent::JobCancel(job_id) => {
                 self.metrics_collector.record_cancelled(&job_id);
 
                 info!("Job {} Cancelled", job_id);
-                let (running_tasks, _pending_tasks) =
-                    self.state.task_manager.cancel_job(&job_id).await?;
+                match self.state.task_manager.cancel_job(&job_id).await {
+                    Ok((running_tasks, _pending_tasks)) => {
+                        event_sender
+                            .post_event(QueryStageSchedulerEvent::CancelTasks(
+                                running_tasks,
+                            ))
+                            .await?;
+                    }
+                    Err(e) => {
+                        error!(
+                            "Fail to invoke cancel_job for job {} due to {:?}",
+                            job_id, e
+                        );
+                    }
+                }
                 self.state.clean_up_failed_job(job_id);
-
-                tx_event
-                    .post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
-                    .await?;
             }
             QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
                 debug!(
@@ -306,22 +257,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 );
 
                 let num_status = tasks_status.len();
+                if self.state.config.is_push_staged_scheduling() {
+                    self.state
+                        .executor_manager
+                        .unbind_tasks(vec![(executor_id.clone(), num_status as u32)])
+                        .await?;
+                }
                 match self
                     .state
                     .update_task_statuses(&executor_id, tasks_status)
                     .await
                 {
-                    Ok((stage_events, offers)) => {
+                    Ok(stage_events) => {
                         if self.state.config.is_push_staged_scheduling() {
-                            tx_event
-                                .post_event(
-                                    QueryStageSchedulerEvent::ReservationOffering(offers),
-                                )
+                            event_sender
+                                .post_event(QueryStageSchedulerEvent::ReviveOffers)
                                 .await?;
                         }
 
                         for stage_event in stage_events {
-                            tx_event.post_event(stage_event).await?;
+                            event_sender.post_event(stage_event).await?;
                         }
                     }
                     Err(e) => {
@@ -333,27 +288,21 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     }
                 }
             }
-            QueryStageSchedulerEvent::ReservationOffering(reservations) => {
-                let (reservations, pending) =
-                    self.state.offer_reservation(reservations).await?;
-
-                self.set_pending_tasks(pending);
-
-                if !reservations.is_empty() {
-                    tx_event
-                        .post_event(QueryStageSchedulerEvent::ReservationOffering(
-                            reservations,
-                        ))
-                        .await?;
-                }
+            QueryStageSchedulerEvent::ReviveOffers => {
+                self.state.revive_offers(event_sender).await?;
             }
             QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => {
                 match self.state.task_manager.executor_lost(&executor_id).await {
                     Ok(tasks) => {
                         if !tasks.is_empty() {
-                            tx_event
-                                .post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
-                                .await?;
+                            if let Err(e) = self
+                                .state
+                                .executor_manager
+                                .cancel_running_tasks(tasks)
+                                .await
+                            {
+                                warn!("Fail to cancel running tasks due to {:?}", e);
+                            }
                         }
                     }
                     Err(e) => {
@@ -365,10 +314,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 }
             }
             QueryStageSchedulerEvent::CancelTasks(tasks) => {
-                self.state
+                if let Err(e) = self
+                    .state
                     .executor_manager
                     .cancel_running_tasks(tasks)
-                    .await?;
+                    .await
+                {
+                    warn!("Fail to cancel running tasks due to {:?}", e);
+                }
             }
             QueryStageSchedulerEvent::JobDataClean(job_id) => {
                 self.state.executor_manager.clean_up_job_data(job_id);
@@ -397,75 +350,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
 #[cfg(test)]
 mod tests {
     use crate::config::SchedulerConfig;
-    use crate::scheduler_server::event::QueryStageSchedulerEvent;
     use crate::test_utils::{await_condition, SchedulerTest, TestMetricsCollector};
     use ballista_core::config::TaskSchedulingPolicy;
     use ballista_core::error::Result;
-    use ballista_core::event_loop::EventAction;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum, LogicalPlan};
-    use datafusion::physical_plan::empty::EmptyExec;
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
     use tracing_subscriber::EnvFilter;
 
     #[tokio::test]
-    async fn test_job_resubmit() -> Result<()> {
-        let plan = test_plan(10);
-
-        let metrics_collector = Arc::new(TestMetricsCollector::default());
-
-        // Set resubmit interval of 1ms
-        let mut test = SchedulerTest::new(
-            SchedulerConfig::default()
-                .with_job_resubmit_interval_ms(1)
-                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
-            metrics_collector.clone(),
-            0,
-            0,
-            None,
-        )
-        .await?;
-
-        test.submit("job-id", "job-name", &plan).await?;
-
-        let query_stage_scheduler = test.query_stage_scheduler();
-
-        let (tx, mut rx) = tokio::sync::mpsc::channel::<QueryStageSchedulerEvent>(10);
-
-        let event = QueryStageSchedulerEvent::JobSubmitted {
-            job_id: "job-id".to_string(),
-            job_name: "job-name".to_string(),
-            session_id: "session-id".to_string(),
-            queued_at: 0,
-            submitted_at: 0,
-            resubmit: false,
-            plan: Arc::new(EmptyExec::new(false, Arc::new(test_schema()))),
-        };
-
-        // Mock the JobQueued work.
-        query_stage_scheduler
-            .state
-            .task_manager
-            .queue_job("job-id", "job-name", 0)
-            .await?;
-
-        query_stage_scheduler.on_receive(event, &tx, &rx).await?;
-
-        let next_event = rx.recv().await.unwrap();
-
-        dbg!(next_event.clone());
-        assert!(matches!(
-            next_event,
-            QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if job_id == "job-id" && resubmit
-        ));
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_pending_task_metric() -> Result<()> {
+    async fn test_pending_job_metric() -> Result<()> {
         tracing_subscriber::fmt()
             .with_env_filter(EnvFilter::from_default_env())
             .init();
@@ -484,92 +380,53 @@ mod tests {
         )
         .await?;
 
-        test.submit("job-1", "", &plan).await?;
-
-        // First stage has 10 tasks, one of which should be scheduled immediately
-        expect_pending_tasks(&test, 9).await;
-
-        test.tick().await?;
-
-        // First task completes and another should be scheduler, so we should have 8
-        expect_pending_tasks(&test, 8).await;
-
-        // Complete the 8 remaining tasks in the first stage
-        for _ in 0..8 {
-            test.tick().await?;
-        }
-
-        // The second stage should be resolved so we should have a new pending task
-        expect_pending_tasks(&test, 1).await;
-
-        // complete the final task
-        test.tick().await?;
-
-        expect_pending_tasks(&test, 0).await;
-
-        // complete the final task
-        test.tick().await?;
-
-        // Job should be finished now
-        let _ = test.await_completion_timeout("job-1", 5_000).await?;
-
-        Ok(())
-    }
-
-    #[ignore]
-    #[tokio::test]
-    async fn test_pending_task_metric_on_cancellation() -> Result<()> {
-        let plan = test_plan(10);
-
-        let metrics_collector = Arc::new(TestMetricsCollector::default());
-
-        let mut test = SchedulerTest::new(
-            SchedulerConfig::default()
-                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
-            metrics_collector.clone(),
-            1,
-            1,
-            None,
-        )
-        .await?;
-
-        test.submit("job-1", "", &plan).await?;
+        let job_id = "job-1";
 
-        // First stage has 10 tasks, one of which should be scheduled immediately
-        expect_pending_tasks(&test, 9).await;
+        test.submit(job_id, "", &plan).await?;
 
         test.tick().await?;
 
-        // First task completes and another should be scheduler, so we should have 8
-        expect_pending_tasks(&test, 8).await;
-
-        test.cancel("job-1").await?;
+        let pending_jobs = test.pending_job_number();
+        let expected = 0usize;
+        assert_eq!(
+            expected, pending_jobs,
+            "Expected {} pending jobs but found {}",
+            expected, pending_jobs
+        );
 
-        // First task completes and another should be scheduler, so we should have 8
-        expect_pending_tasks(&test, 0).await;
+        let running_jobs = test.running_job_number();
+        let expected = 1usize;
+        assert_eq!(
+            expected, running_jobs,
+            "Expected {} running jobs but found {}",
+            expected, running_jobs
+        );
 
-        Ok(())
-    }
+        test.cancel(job_id).await?;
 
-    async fn expect_pending_tasks(test: &SchedulerTest, expected: usize) {
-        let success = await_condition(Duration::from_millis(500), 20, || {
-            let pending_tasks = test.pending_tasks();
+        let expected = 0usize;
+        let success = await_condition(Duration::from_millis(10), 20, || {
+            let running_jobs = test.running_job_number();
 
-            futures::future::ready(Ok(pending_tasks == expected))
+            futures::future::ready(Ok(running_jobs == expected))
         })
         .await
         .unwrap();
-
         assert!(
             success,
-            "Expected {} pending tasks but found {}",
+            "Expected {} running jobs but found {}",
             expected,
-            test.pending_tasks()
+            test.running_job_number()
         );
+
+        Ok(())
     }
 
     fn test_plan(partitions: usize) -> LogicalPlan {
-        let schema = test_schema();
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Utf8, false),
+            Field::new("gmv", DataType::UInt64, false),
+        ]);
 
         scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
             .unwrap()
@@ -578,11 +435,4 @@ mod tests {
             .build()
             .unwrap()
     }
-
-    fn test_schema() -> Schema {
-        Schema::new(vec![
-            Field::new("id", DataType::Utf8, false),
-            Field::new("gmv", DataType::UInt64, false),
-        ])
-    }
 }
diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs
index c00363d7..dde1268c 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -48,6 +48,7 @@ use crate::display::print_stage_metrics;
 use crate::planner::DistributedPlanner;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::timestamp_millis;
+use crate::state::execution_graph::execution_stage::RunningStage;
 pub(crate) use crate::state::execution_graph::execution_stage::{
     ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, TaskInfo,
     UnresolvedStage,
@@ -196,8 +197,8 @@ impl ExecutionGraph {
         self.session_id.as_str()
     }
 
-    pub fn status(&self) -> JobStatus {
-        self.status.clone()
+    pub fn status(&self) -> &JobStatus {
+        &self.status
     }
 
     pub fn start_time(&self) -> u64 {
@@ -930,6 +931,64 @@ impl ExecutionGraph {
         Ok(next_task)
     }
 
+    pub(crate) fn fetch_running_stage(
+        &mut self,
+        black_list: &[usize],
+    ) -> Option<(&mut RunningStage, &mut usize)> {
+        if matches!(
+            self.status,
+            JobStatus {
+                status: Some(job_status::Status::Failed(_)),
+                ..
+            }
+        ) {
+            warn!("Call fetch_runnable_stage on failed Job");
+            return None;
+        }
+
+        let running_stage_id = self.get_running_stage_id(black_list);
+        if let Some(running_stage_id) = running_stage_id {
+            if let Some(ExecutionStage::Running(running_stage)) =
+                self.stages.get_mut(&running_stage_id)
+            {
+                Some((running_stage, &mut self.task_id_gen))
+            } else {
+                warn!("Fail to find running stage with id {running_stage_id}");
+                None
+            }
+        } else {
+            None
+        }
+    }
+
+    fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
+        let mut running_stage_id = self.stages.iter().find_map(|(stage_id, stage)| {
+            if black_list.contains(stage_id) {
+                None
+            } else if let ExecutionStage::Running(stage) = stage {
+                if stage.available_tasks() > 0 {
+                    Some(*stage_id)
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        });
+
+        // If no available tasks found in the running stage,
+        // try to find a resolved stage and convert it to the running stage
+        if running_stage_id.is_none() {
+            if self.revive() {
+                running_stage_id = self.get_running_stage_id(black_list);
+            } else {
+                running_stage_id = None;
+            }
+        }
+
+        running_stage_id
+    }
+
     pub fn update_status(&mut self, status: JobStatus) {
         self.status = status;
     }
@@ -1430,6 +1489,22 @@ impl Debug for ExecutionGraph {
     }
 }
 
+pub(crate) fn create_task_info(executor_id: String, task_id: usize) -> TaskInfo {
+    TaskInfo {
+        task_id,
+        scheduled_time: SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_millis(),
+        // Those times will be updated when the task finish
+        launch_time: 0,
+        start_exec_time: 0,
+        end_exec_time: 0,
+        finish_time: 0,
+        task_status: task_status::Status::Running(RunningTask { executor_id }),
+    }
+}
+
 /// Utility for building a set of `ExecutionStage`s from
 /// a list of `ShuffleWriterExec`.
 ///
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index 7a6bf461..eed1dba0 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -17,18 +17,19 @@
 
 use std::time::Duration;
 
-#[cfg(not(test))]
 use ballista_core::error::BallistaError;
 use ballista_core::error::Result;
 use ballista_core::serde::protobuf;
 
-use crate::cluster::ClusterState;
+use crate::cluster::{BoundTask, ClusterState, ExecutorSlot};
 use crate::config::SchedulerConfig;
 
 use crate::state::execution_graph::RunningTaskInfo;
+use crate::state::task_manager::JobInfoCache;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
+    executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
+    RemoveJobDataParams, StopExecutorParams,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::utils::{create_grpc_client_connection, get_time_before};
@@ -40,42 +41,6 @@ use tonic::transport::Channel;
 
 type ExecutorClients = Arc<DashMap<String, ExecutorGrpcClient<Channel>>>;
 
-/// Represents a task slot that is reserved (i.e. available for scheduling but not visible to the
-/// rest of the system).
-/// When tasks finish we want to preferentially assign new tasks from the same job, so the reservation
-/// can already be assigned to a particular job ID. In that case, the scheduler will try to schedule
-/// available tasks for that job to the reserved task slot.
-#[derive(Clone, Debug)]
-pub struct ExecutorReservation {
-    pub executor_id: String,
-    pub job_id: Option<String>,
-}
-
-impl ExecutorReservation {
-    pub fn new_free(executor_id: String) -> Self {
-        Self {
-            executor_id,
-            job_id: None,
-        }
-    }
-
-    pub fn new_assigned(executor_id: String, job_id: String) -> Self {
-        Self {
-            executor_id,
-            job_id: Some(job_id),
-        }
-    }
-
-    pub fn assign(mut self, job_id: String) -> Self {
-        self.job_id = Some(job_id);
-        self
-    }
-
-    pub fn assigned(&self) -> bool {
-        self.job_id.is_some()
-    }
-}
-
 #[derive(Clone)]
 pub struct ExecutorManager {
     cluster_state: Arc<dyn ClusterState>,
@@ -101,66 +66,76 @@ impl ExecutorManager {
         Ok(())
     }
 
-    /// Reserve up to n executor task slots. Once reserved these slots will not be available
-    /// for scheduling.
-    /// This operation is atomic, so if this method return an Err, no slots have been reserved.
-    pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
+    /// Bind the ready to running tasks from [`active_jobs`] with available executors.
+    ///
+    /// If `executors` is provided, only bind slots from the specified executor IDs
+    pub async fn bind_schedulable_tasks(
+        &self,
+        active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    ) -> Result<Vec<BoundTask>> {
+        if active_jobs.is_empty() {
+            warn!("There's no active jobs for binding tasks");
+            return Ok(vec![]);
+        }
         let alive_executors = self.get_alive_executors();
-
-        debug!("Alive executors: {alive_executors:?}");
-
+        if alive_executors.is_empty() {
+            warn!("There's no alive executors for binding tasks");
+            return Ok(vec![]);
+        }
         self.cluster_state
-            .reserve_slots(n, self.config.task_distribution, Some(alive_executors))
+            .bind_schedulable_tasks(
+                self.config.task_distribution,
+                active_jobs,
+                Some(alive_executors),
+            )
             .await
     }
 
     /// Returned reserved task slots to the pool of available slots. This operation is atomic
     /// so either the entire pool of reserved task slots it returned or none are.
-    pub async fn cancel_reservations(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
-        self.cluster_state.cancel_reservations(reservations).await
+    pub async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
+        self.cluster_state.unbind_tasks(executor_slots).await
     }
 
     /// Send rpc to Executors to cancel the running tasks
     pub async fn cancel_running_tasks(&self, tasks: Vec<RunningTaskInfo>) -> Result<()> {
-        let mut tasks_to_cancel: HashMap<&str, Vec<protobuf::RunningTaskInfo>> =
+        let mut tasks_to_cancel: HashMap<String, Vec<protobuf::RunningTaskInfo>> =
             Default::default();
 
-        for task_info in &tasks {
-            if let Some(infos) = tasks_to_cancel.get_mut(task_info.executor_id.as_str()) {
-                infos.push(protobuf::RunningTaskInfo {
-                    task_id: task_info.task_id as u32,
-                    job_id: task_info.job_id.clone(),
-                    stage_id: task_info.stage_id as u32,
-                    partition_id: task_info.partition_id as u32,
-                })
-            } else {
-                tasks_to_cancel.insert(
-                    task_info.executor_id.as_str(),
-                    vec![protobuf::RunningTaskInfo {
-                        task_id: task_info.task_id as u32,
-                        job_id: task_info.job_id.clone(),
-                        stage_id: task_info.stage_id as u32,
-                        partition_id: task_info.partition_id as u32,
-                    }],
-                );
-            }
+        for task_info in tasks {
+            let infos = tasks_to_cancel
+                .entry(task_info.executor_id)
+                .or_insert_with(Vec::new);
+            infos.push(protobuf::RunningTaskInfo {
+                task_id: task_info.task_id as u32,
+                job_id: task_info.job_id,
+                stage_id: task_info.stage_id as u32,
+                partition_id: task_info.partition_id as u32,
+            });
         }
 
-        for (executor_id, infos) in tasks_to_cancel {
-            if let Ok(mut client) = self.get_client(executor_id).await {
-                client
-                    .cancel_tasks(CancelTasksParams { task_infos: infos })
-                    .await?;
-            } else {
-                error!(
-                    "Failed to get client for executor ID {} to cancel tasks",
-                    executor_id
-                )
+        let executor_manager = self.clone();
+        tokio::spawn(async move {
+            for (executor_id, infos) in tasks_to_cancel {
+                if let Ok(mut client) = executor_manager.get_client(&executor_id).await {
+                    if let Err(e) = client
+                        .cancel_tasks(CancelTasksParams { task_infos: infos })
+                        .await
+                    {
+                        error!(
+                            "Fail to cancel tasks for executor ID {} due to {:?}",
+                            executor_id, e
+                        );
+                    }
+                } else {
+                    error!(
+                        "Failed to get client for executor ID {} to cancel tasks",
+                        executor_id
+                    )
+                }
             }
-        }
+        });
+
         Ok(())
     }
 
@@ -218,30 +193,6 @@ impl ExecutorManager {
         }
     }
 
-    pub async fn get_client(
-        &self,
-        executor_id: &str,
-    ) -> Result<ExecutorGrpcClient<Channel>> {
-        let client = self.clients.get(executor_id).map(|value| value.clone());
-
-        if let Some(client) = client {
-            Ok(client)
-        } else {
-            let executor_metadata = self.get_executor_metadata(executor_id).await?;
-            let executor_url = format!(
-                "http://{}:{}",
-                executor_metadata.host, executor_metadata.grpc_port
-            );
-            let connection = create_grpc_client_connection(executor_url).await?;
-            let client = ExecutorGrpcClient::new(connection);
-
-            {
-                self.clients.insert(executor_id.to_owned(), client.clone());
-            }
-            Ok(client)
-        }
-    }
-
     /// Get a list of all executors along with the timestamp of their last recorded heartbeat
     pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, Duration)>> {
         let heartbeat_timestamps: Vec<(String, u64)> = self
@@ -263,6 +214,7 @@ impl ExecutorManager {
         Ok(state)
     }
 
+    /// Get executor metadata for the provided executor ID. Returns an error if the executor does not exist
     pub async fn get_executor_metadata(
         &self,
         executor_id: &str,
@@ -270,56 +222,38 @@ impl ExecutorManager {
         self.cluster_state.get_executor_metadata(executor_id).await
     }
 
+    /// It's only used for pull-based task scheduling.
+    ///
+    /// For push-based one, we should use [`register_executor`], instead.
     pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
         self.cluster_state.save_executor_metadata(metadata).await
     }
 
-    /// Register the executor with the scheduler. This will save the executor metadata and the
-    /// executor data to persistent state.
+    /// Register the executor with the scheduler.
     ///
-    /// If `reserve` is true, then any available task slots will be reserved and dispatched for scheduling.
-    /// If `reserve` is false, then the executor data will be saved as is.
+    /// This will save the executor metadata and the executor data to persistent state.
     ///
-    /// In general, reserve should be true is the scheduler is using push-based scheduling and false
-    /// if the scheduler is using pull-based scheduling.
+    /// It's only used for push-based task scheduling
     pub async fn register_executor(
         &self,
         metadata: ExecutorMetadata,
         specification: ExecutorData,
-        reserve: bool,
-    ) -> Result<Vec<ExecutorReservation>> {
+    ) -> Result<()> {
         debug!(
             "registering executor {} with {} task slots",
             metadata.id, specification.total_task_slots
         );
 
-        self.test_scheduler_connectivity(&metadata).await?;
+        ExecutorManager::test_connectivity(&metadata).await?;
 
-        if !reserve {
-            self.cluster_state
-                .register_executor(metadata, specification.clone(), reserve)
-                .await?;
-
-            Ok(vec![])
-        } else {
-            let mut specification = specification;
-            let num_slots = specification.available_task_slots as usize;
-            let mut reservations: Vec<ExecutorReservation> = vec![];
-            for _ in 0..num_slots {
-                reservations.push(ExecutorReservation::new_free(metadata.id.clone()));
-            }
-
-            specification.available_task_slots = 0;
-
-            self.cluster_state
-                .register_executor(metadata, specification, reserve)
-                .await?;
+        self.cluster_state
+            .register_executor(metadata, specification)
+            .await?;
 
-            Ok(reservations)
-        }
+        Ok(())
     }
 
-    /// Remove the executor within the scheduler.
+    /// Remove the executor from the cluster
     pub async fn remove_executor(
         &self,
         executor_id: &str,
@@ -329,29 +263,55 @@ impl ExecutorManager {
         self.cluster_state.remove_executor(executor_id).await
     }
 
-    #[cfg(not(test))]
-    async fn test_scheduler_connectivity(
+    pub async fn stop_executor(&self, executor_id: &str, stop_reason: String) {
+        let executor_id = executor_id.to_string();
+        match self.get_client(&executor_id).await {
+            Ok(mut client) => {
+                tokio::task::spawn(async move {
+                    match client
+                        .stop_executor(StopExecutorParams {
+                            executor_id: executor_id.to_string(),
+                            reason: stop_reason,
+                            force: true,
+                        })
+                        .await
+                    {
+                        Err(error) => {
+                            warn!("Failed to send stop_executor rpc due to, {}", error);
+                        }
+                        Ok(_value) => {}
+                    }
+                });
+            }
+            Err(_) => {
+                warn!(
+                    "Executor is already dead, failed to connect to Executor {}",
+                    executor_id
+                );
+            }
+        }
+    }
+
+    pub async fn launch_multi_task(
         &self,
-        metadata: &ExecutorMetadata,
+        executor_id: &str,
+        multi_tasks: Vec<MultiTaskDefinition>,
+        scheduler_id: String,
     ) -> Result<()> {
-        let executor_url = format!("http://{}:{}", metadata.host, metadata.grpc_port);
-        debug!("Connecting to executor {:?}", executor_url);
-        let _ = protobuf::executor_grpc_client::ExecutorGrpcClient::connect(executor_url)
+        let mut client = self.get_client(executor_id).await?;
+        client
+            .launch_multi_task(protobuf::LaunchMultiTaskParams {
+                multi_tasks,
+                scheduler_id,
+            })
             .await
             .map_err(|e| {
                 BallistaError::Internal(format!(
-                    "Failed to register executor at {}:{}, could not connect: {:?}",
-                    metadata.host, metadata.grpc_port, e
+                    "Failed to connect to executor {}: {:?}",
+                    executor_id, e
                 ))
             })?;
-        Ok(())
-    }
 
-    #[cfg(test)]
-    async fn test_scheduler_connectivity(
-        &self,
-        _metadata: &ExecutorMetadata,
-    ) -> Result<()> {
         Ok(())
     }
 
@@ -433,289 +393,45 @@ impl ExecutorManager {
             })
             .collect::<Vec<_>>()
     }
-}
-
-#[cfg(test)]
-mod test {
-    use crate::config::{SchedulerConfig, TaskDistribution};
-    use std::sync::Arc;
-
-    use crate::scheduler_server::timestamp_secs;
-    use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
-    use crate::test_utils::test_cluster_context;
-    use ballista_core::error::Result;
-    use ballista_core::serde::protobuf::executor_status::Status;
-    use ballista_core::serde::protobuf::{ExecutorHeartbeat, ExecutorStatus};
-    use ballista_core::serde::scheduler::{
-        ExecutorData, ExecutorMetadata, ExecutorSpecification,
-    };
-
-    #[tokio::test]
-    async fn test_reserve_and_cancel() -> Result<()> {
-        test_reserve_and_cancel_inner(TaskDistribution::Bias).await?;
-        test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?;
-
-        Ok(())
-    }
-
-    async fn test_reserve_and_cancel_inner(
-        task_distribution: TaskDistribution,
-    ) -> Result<()> {
-        let cluster = test_cluster_context();
-
-        let config = SchedulerConfig::default().with_task_distribution(task_distribution);
-        let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
-        let executors = test_executors(10, 4);
-
-        for (executor_metadata, executor_data) in executors {
-            executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
-
-        // Reserve all the slots
-        let reservations = executor_manager.reserve_slots(40).await?;
-
-        assert_eq!(
-            reservations.len(),
-            40,
-            "Expected 40 reservations for policy {task_distribution:?}"
-        );
-
-        // Now cancel them
-        executor_manager.cancel_reservations(reservations).await?;
-
-        // Now reserve again
-        let reservations = executor_manager.reserve_slots(40).await?;
-
-        assert_eq!(
-            reservations.len(),
-            40,
-            "Expected 40 reservations for policy {task_distribution:?}"
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_reserve_partial() -> Result<()> {
-        test_reserve_partial_inner(TaskDistribution::Bias).await?;
-        test_reserve_partial_inner(TaskDistribution::RoundRobin).await?;
-
-        Ok(())
-    }
-
-    async fn test_reserve_partial_inner(
-        task_distribution: TaskDistribution,
-    ) -> Result<()> {
-        let cluster = test_cluster_context();
-
-        let config = SchedulerConfig::default().with_task_distribution(task_distribution);
-        let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
-        let executors = test_executors(10, 4);
-
-        for (executor_metadata, executor_data) in executors {
-            executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
-
-        // Reserve all the slots
-        let reservations = executor_manager.reserve_slots(30).await?;
-
-        assert_eq!(reservations.len(), 30);
-
-        // Try to reserve 30 more. Only ten are available though so we should only get 10
-        let more_reservations = executor_manager.reserve_slots(30).await?;
-
-        assert_eq!(more_reservations.len(), 10);
-
-        // Now cancel them
-        executor_manager.cancel_reservations(reservations).await?;
-        executor_manager
-            .cancel_reservations(more_reservations)
-            .await?;
-
-        // Now reserve again
-        let reservations = executor_manager.reserve_slots(40).await?;
-
-        assert_eq!(reservations.len(), 40);
-
-        let more_reservations = executor_manager.reserve_slots(30).await?;
-
-        assert_eq!(more_reservations.len(), 0);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_reserve_concurrent() -> Result<()> {
-        test_reserve_concurrent_inner(TaskDistribution::Bias).await?;
-        test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?;
-
-        Ok(())
-    }
-
-    async fn test_reserve_concurrent_inner(
-        task_distribution: TaskDistribution,
-    ) -> Result<()> {
-        let (sender, mut receiver) =
-            tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
 
-        let executors = test_executors(10, 4);
-
-        let config = SchedulerConfig::default().with_task_distribution(task_distribution);
-        let cluster = test_cluster_context();
-        let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
+    async fn get_client(&self, executor_id: &str) -> Result<ExecutorGrpcClient<Channel>> {
+        let client = self.clients.get(executor_id).map(|value| value.clone());
 
-        for (executor_metadata, executor_data) in executors {
-            executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
+        if let Some(client) = client {
+            Ok(client)
+        } else {
+            let executor_metadata = self.get_executor_metadata(executor_id).await?;
+            let executor_url = format!(
+                "http://{}:{}",
+                executor_metadata.host, executor_metadata.grpc_port
+            );
+            let connection = create_grpc_client_connection(executor_url).await?;
+            let client = ExecutorGrpcClient::new(connection);
 
-        {
-            let sender = sender;
-            // Spawn 20 async tasks to each try and reserve all 40 slots
-            for _ in 0..20 {
-                let executor_manager = executor_manager.clone();
-                let sender = sender.clone();
-                tokio::task::spawn(async move {
-                    let reservations = executor_manager.reserve_slots(40).await;
-                    sender.send(reservations).await.unwrap();
-                });
+            {
+                self.clients.insert(executor_id.to_owned(), client.clone());
             }
+            Ok(client)
         }
-
-        let mut total_reservations: Vec<ExecutorReservation> = vec![];
-
-        while let Some(Ok(reservations)) = receiver.recv().await {
-            total_reservations.extend(reservations);
-        }
-
-        // The total number of reservations should never exceed the number of slots
-        assert_eq!(total_reservations.len(), 40);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_register_reserve() -> Result<()> {
-        test_register_reserve_inner(TaskDistribution::Bias).await?;
-        test_register_reserve_inner(TaskDistribution::RoundRobin).await?;
-
-        Ok(())
-    }
-
-    async fn test_register_reserve_inner(
-        task_distribution: TaskDistribution,
-    ) -> Result<()> {
-        let cluster = test_cluster_context();
-
-        let config = SchedulerConfig::default().with_task_distribution(task_distribution);
-        let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
-        let executors = test_executors(10, 4);
-
-        for (executor_metadata, executor_data) in executors {
-            let reservations = executor_manager
-                .register_executor(executor_metadata, executor_data, true)
-                .await?;
-
-            assert_eq!(reservations.len(), 4);
-        }
-
-        // All slots should be reserved
-        let reservations = executor_manager.reserve_slots(1).await?;
-
-        assert_eq!(reservations.len(), 0);
-
-        Ok(())
     }
 
-    #[tokio::test]
-    async fn test_ignore_fenced_executors() -> Result<()> {
-        test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?;
-        test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?;
-
+    #[cfg(not(test))]
+    async fn test_connectivity(metadata: &ExecutorMetadata) -> Result<()> {
+        let executor_url = format!("http://{}:{}", metadata.host, metadata.grpc_port);
+        debug!("Connecting to executor {:?}", executor_url);
+        let _ = protobuf::executor_grpc_client::ExecutorGrpcClient::connect(executor_url)
+            .await
+            .map_err(|e| {
+                BallistaError::Internal(format!(
+                    "Failed to register executor at {}:{}, could not connect: {:?}",
+                    metadata.host, metadata.grpc_port, e
+                ))
+            })?;
         Ok(())
     }
 
-    async fn test_ignore_fenced_executors_inner(
-        task_distribution: TaskDistribution,
-    ) -> Result<()> {
-        let cluster = test_cluster_context();
-
-        let config = SchedulerConfig::default().with_task_distribution(task_distribution);
-        let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
-        // Setup two executors initially
-        let executors = test_executors(2, 4);
-
-        for (executor_metadata, executor_data) in executors {
-            let _ = executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
-
-        // Fence one of the executors
-        executor_manager
-            .save_executor_heartbeat(ExecutorHeartbeat {
-                executor_id: "executor-0".to_string(),
-                timestamp: timestamp_secs(),
-                metrics: vec![],
-                status: Some(ExecutorStatus {
-                    status: Some(Status::Terminating(String::default())),
-                }),
-            })
-            .await?;
-
-        let reservations = executor_manager.reserve_slots(8).await?;
-
-        assert_eq!(reservations.len(), 4, "Expected only four reservations");
-
-        assert!(
-            reservations
-                .iter()
-                .all(|res| res.executor_id == "executor-1"),
-            "Expected all reservations from non-fenced executor",
-        );
-
+    #[cfg(test)]
+    async fn test_connectivity(_metadata: &ExecutorMetadata) -> Result<()> {
         Ok(())
     }
-
-    fn test_executors(
-        total_executors: usize,
-        slots_per_executor: u32,
-    ) -> Vec<(ExecutorMetadata, ExecutorData)> {
-        let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
-        for i in 0..total_executors {
-            result.push((
-                ExecutorMetadata {
-                    id: format!("executor-{i}"),
-                    host: format!("host-{i}"),
-                    port: 8080,
-                    grpc_port: 9090,
-                    specification: ExecutorSpecification {
-                        task_slots: slots_per_executor,
-                    },
-                },
-                ExecutorData {
-                    executor_id: format!("executor-{i}"),
-                    total_task_slots: slots_per_executor,
-                    available_task_slots: slots_per_executor,
-                },
-            ));
-        }
-
-        result
-    }
 }
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 2ad2d707..957bbcfe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -26,23 +26,23 @@ use std::time::Instant;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
 use crate::state::session_manager::SessionManager;
 use crate::state::task_manager::{TaskLauncher, TaskManager};
 
-use crate::cluster::BallistaCluster;
+use crate::cluster::{BallistaCluster, BoundTask, ExecutorSlot};
 use crate::config::SchedulerConfig;
 use crate::state::execution_graph::TaskDescription;
 use ballista_core::error::{BallistaError, Result};
+use ballista_core::event_loop::EventSender;
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use prost::Message;
 
 pub mod execution_graph;
@@ -155,139 +155,105 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         self.executor_manager.init().await
     }
 
-    pub(crate) async fn update_task_statuses(
+    pub(crate) async fn revive_offers(
         &self,
-        executor_id: &str,
-        tasks_status: Vec<TaskStatus>,
-    ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
-        let executor = self
+        sender: EventSender<QueryStageSchedulerEvent>,
+    ) -> Result<()> {
+        let schedulable_tasks = self
             .executor_manager
-            .get_executor_metadata(executor_id)
+            .bind_schedulable_tasks(self.task_manager.get_running_job_cache())
             .await?;
+        if schedulable_tasks.is_empty() {
+            warn!("No schedulable tasks found to be launched");
+            return Ok(());
+        }
 
-        let total_num_tasks = tasks_status.len();
-        let reservations = (0..total_num_tasks)
-            .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
-            .collect();
-
-        let events = self
-            .task_manager
-            .update_task_statuses(&executor, tasks_status)
-            .await?;
-
-        Ok((events, reservations))
-    }
-
-    /// Process reservations which are offered. The basic process is
-    /// 1. Attempt to fill the offered reservations with available tasks
-    /// 2. For any reservation that filled, launch the assigned task on the executor.
-    /// 3. For any reservations that could not be filled, cancel the reservation (i.e. return the
-    ///    task slot back to the pool of available task slots).
-    ///
-    /// NOTE Error handling in this method is very important. No matter what we need to ensure
-    /// that unfilled reservations are cancelled or else they could become permanently "invisible"
-    /// to the scheduler.
-    pub(crate) async fn offer_reservation(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<(Vec<ExecutorReservation>, usize)> {
-        let pending_tasks = match self.task_manager.fill_reservations(&reservations).await
-        {
-            Ok((assignments, unassigned_reservations, pending_tasks)) => {
-                let executor_stage_assignments = Self::combine_task(assignments);
-
-                self.spawn_tasks_and_persist_reservations_back(
-                    executor_stage_assignments,
-                    unassigned_reservations,
-                );
-
-                pending_tasks
+        let state = self.clone();
+        tokio::spawn(async move {
+            let mut if_revive = false;
+            match state.launch_tasks(schedulable_tasks).await {
+                Ok(unassigned_executor_slots) => {
+                    if !unassigned_executor_slots.is_empty() {
+                        if let Err(e) = state
+                            .executor_manager
+                            .unbind_tasks(unassigned_executor_slots)
+                            .await
+                        {
+                            error!("Fail to unbind tasks: {}", e);
+                        }
+                        if_revive = true;
+                    }
+                }
+                Err(e) => {
+                    error!("Fail to launch tasks: {}", e);
+                    if_revive = true;
+                }
             }
-            // If error set all reservations back
-            Err(e) => {
-                error!("Error filling reservations: {:?}", e);
-                self.executor_manager
-                    .cancel_reservations(reservations)
-                    .await?;
-                0
+            if if_revive {
+                if let Err(e) = sender
+                    .post_event(QueryStageSchedulerEvent::ReviveOffers)
+                    .await
+                {
+                    error!("Fail to send revive offers event due to {:?}", e);
+                }
             }
-        };
-
-        let mut new_reservations = vec![];
-
-        if pending_tasks > 0 {
-            // If there are pending tasks available, try and schedule them
-            let pending_reservations = self
-                .executor_manager
-                .reserve_slots(pending_tasks as u32)
-                .await?;
-            new_reservations.extend(pending_reservations);
-        }
+        });
 
-        Ok((new_reservations, pending_tasks))
+        Ok(())
     }
 
-    fn spawn_tasks_and_persist_reservations_back(
+    /// Remove an executor.
+    /// 1. The executor related info will be removed from [`ExecutorManager`]
+    /// 2. All of affected running execution graph will be rolled backed
+    /// 3. All of the running tasks of the affected running stages will be cancelled
+    pub(crate) async fn remove_executor(
         &self,
-        executor_stage_assignments: HashMap<
-            String,
-            HashMap<(String, usize), Vec<TaskDescription>>,
-        >,
-        mut unassigned_reservations: Vec<ExecutorReservation>,
+        executor_id: &str,
+        reason: Option<String>,
     ) {
-        let task_manager = self.task_manager.clone();
-        let executor_manager = self.executor_manager.clone();
-
-        tokio::spawn(async move {
-            for (executor_id, tasks) in executor_stage_assignments.into_iter() {
-                let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
-                // Total number of tasks to be launched for one executor
-                let n_tasks: usize =
-                    tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+        if let Err(e) = self
+            .executor_manager
+            .remove_executor(executor_id, reason)
+            .await
+        {
+            warn!("Fail to remove executor {}: {}", executor_id, e);
+        }
 
-                match executor_manager.get_executor_metadata(&executor_id).await {
-                    Ok(executor) => {
-                        if let Err(e) = task_manager
-                            .launch_multi_task(&executor, tasks, &executor_manager)
-                            .await
-                        {
-                            error!("Failed to launch new task: {:?}", e);
-                            // set resource back.
-                            unassigned_reservations.append(&mut vec![
-                                    ExecutorReservation::new_free(
-                                        executor_id.clone(),
-                                    );
-                                    n_tasks
-                                ]);
-                        }
+        match self.task_manager.executor_lost(executor_id).await {
+            Ok(tasks) => {
+                if !tasks.is_empty() {
+                    if let Err(e) =
+                        self.executor_manager.cancel_running_tasks(tasks).await
+                    {
+                        warn!("Fail to cancel running tasks due to {:?}", e);
                     }
-                    Err(e) => {
-                        error!("Failed to launch new task, could not get executor metadata: {:?}", e);
-                        // here no need set resource back.
-                    }
-                };
+                }
             }
-            if !unassigned_reservations.is_empty() {
-                // If any reserved slots remain, return them to the pool
-                executor_manager
-                    .cancel_reservations(unassigned_reservations)
-                    .await
-                    .expect("cancel_reservations fail!");
+            Err(e) => {
+                error!(
+                    "TaskManager error to handle Executor {} lost: {}",
+                    executor_id, e
+                );
             }
-        });
+        }
     }
 
-    // Put tasks to the same executor together
-    // And put tasks belonging to the same stage together for creating MultiTaskDefinition
-    // return a map of <executor_id, <stage_key, TaskDesc>>.
-    fn combine_task(
-        assignments: Vec<(String, TaskDescription)>,
-    ) -> HashMap<String, HashMap<(String, usize), Vec<TaskDescription>>> {
+    /// Given a vector of bound tasks,
+    /// 1. Firstly reorganize according to: executor -> job stage -> tasks;
+    /// 2. Then launch the task set vector to each executor one by one.
+    ///
+    /// If it fails to launch a task set, the related [`ExecutorSlot`] will be returned.
+    async fn launch_tasks(
+        &self,
+        bound_tasks: Vec<BoundTask>,
+    ) -> Result<Vec<ExecutorSlot>> {
+        // Put tasks to the same executor together
+        // And put tasks belonging to the same stage together for creating MultiTaskDefinition
         let mut executor_stage_assignments: HashMap<
             String,
             HashMap<(String, usize), Vec<TaskDescription>>,
         > = HashMap::new();
-        for (executor_id, task) in assignments.into_iter() {
+        for (executor_id, task) in bound_tasks.into_iter() {
             let stage_key = (task.partition.job_id.clone(), task.partition.stage_id);
             if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) {
                 if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
@@ -304,15 +270,90 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                 executor_stage_assignments.insert(executor_id, executor_stage_tasks);
             }
         }
-        executor_stage_assignments
+
+        let mut join_handles = vec![];
+        for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+            let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+            // Total number of tasks to be launched for one executor
+            let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
+            let state = self.clone();
+            let join_handle = tokio::spawn(async move {
+                let success = match state
+                    .executor_manager
+                    .get_executor_metadata(&executor_id)
+                    .await
+                {
+                    Ok(executor) => {
+                        if let Err(e) = state
+                            .task_manager
+                            .launch_multi_task(&executor, tasks, &state.executor_manager)
+                            .await
+                        {
+                            let err_msg = format!("Failed to launch new task: {e}");
+                            error!("{}", err_msg.clone());
+
+                            // It's OK to remove executor aggressively,
+                            // since if the executor is in healthy state, it will be registered again.
+                            state.remove_executor(&executor_id, Some(err_msg)).await;
+
+                            false
+                        } else {
+                            true
+                        }
+                    }
+                    Err(e) => {
+                        error!("Failed to launch new task, could not get executor metadata: {}", e);
+                        false
+                    }
+                };
+                if success {
+                    vec![]
+                } else {
+                    vec![(executor_id.clone(), n_tasks as u32)]
+                }
+            });
+            join_handles.push(join_handle);
+        }
+
+        let unassigned_executor_slots =
+            futures::future::join_all(join_handles)
+                .await
+                .into_iter()
+                .collect::<std::result::Result<
+                    Vec<Vec<ExecutorSlot>>,
+                    tokio::task::JoinError,
+                >>()?;
+
+        Ok(unassigned_executor_slots
+            .into_iter()
+            .flatten()
+            .collect::<Vec<ExecutorSlot>>())
     }
 
-    pub(crate) async fn plan_job(
+    pub(crate) async fn update_task_statuses(
+        &self,
+        executor_id: &str,
+        tasks_status: Vec<TaskStatus>,
+    ) -> Result<Vec<QueryStageSchedulerEvent>> {
+        let executor = self
+            .executor_manager
+            .get_executor_metadata(executor_id)
+            .await?;
+
+        self.task_manager
+            .update_task_statuses(&executor, tasks_status)
+            .await
+    }
+
+    pub(crate) async fn submit_job(
         &self,
         job_id: &str,
+        job_name: &str,
         session_ctx: Arc<SessionContext>,
         plan: &LogicalPlan,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
+        queued_at: u64,
+    ) -> Result<()> {
         let start = Instant::now();
 
         if log::max_level() >= log::Level::Debug {
@@ -349,10 +390,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                             .map_err(|e| {
                                 DataFusionError::External(
                                     format!(
-                                    "logical plan refers to path on local file system \
+                                        "logical plan refers to path on local file system \
                                 that is not accessible in the scheduler: {url}: {e:?}"
-                                )
-                                    .into(),
+                                    )
+                                        .into(),
                                 )
                             })?;
                     }
@@ -367,11 +408,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             DisplayableExecutionPlan::new(plan.as_ref()).indent()
         );
 
+        self.task_manager
+            .submit_job(job_id, job_name, &session_ctx.session_id(), plan, queued_at)
+            .await?;
+
         let elapsed = start.elapsed();
 
         info!("Planned job {} in {:?}", job_id, elapsed);
 
-        Ok(plan)
+        Ok(())
     }
 
     /// Spawn a delayed future to clean up job data on both Scheduler and Executors
@@ -395,320 +440,3 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         );
     }
 }
-
-#[cfg(test)]
-mod test {
-
-    use crate::state::SchedulerState;
-    use ballista_core::config::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
-    use ballista_core::error::Result;
-    use ballista_core::serde::protobuf::{
-        task_status, ShuffleWritePartition, SuccessfulTask, TaskStatus,
-    };
-    use ballista_core::serde::scheduler::{
-        ExecutorData, ExecutorMetadata, ExecutorSpecification,
-    };
-    use ballista_core::serde::BallistaCodec;
-
-    use crate::config::SchedulerConfig;
-
-    use crate::scheduler_server::timestamp_millis;
-    use crate::test_utils::{test_cluster_context, BlackholeTaskLauncher};
-    use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::logical_expr::{col, sum};
-    use datafusion::physical_plan::ExecutionPlan;
-    use datafusion::prelude::SessionContext;
-    use datafusion::test_util::scan_empty;
-    use datafusion_proto::protobuf::LogicalPlanNode;
-    use datafusion_proto::protobuf::PhysicalPlanNode;
-    use std::sync::Arc;
-
-    const TEST_SCHEDULER_NAME: &str = "localhost:50050";
-
-    // We should free any reservations which are not assigned
-    #[tokio::test]
-    async fn test_offer_free_reservations() -> Result<()> {
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
-                test_cluster_context(),
-                BallistaCodec::default(),
-            ));
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        let reservations = state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, true)
-            .await?;
-
-        let (result, assigned) = state.offer_reservation(reservations).await?;
-
-        assert_eq!(assigned, 0);
-        assert!(result.is_empty());
-
-        // Need sleep wait for the spawn task work done.
-        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
-        // All reservations should have been cancelled so we should be able to reserve them now
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 4);
-
-        Ok(())
-    }
-
-    // We should fill unbound reservations to any available task
-    #[tokio::test]
-    async fn test_offer_fill_reservations() -> Result<()> {
-        let config = BallistaConfig::builder()
-            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
-            .build()?;
-
-        let scheduler_config = SchedulerConfig::default();
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_task_launcher(
-                test_cluster_context(),
-                BallistaCodec::default(),
-                TEST_SCHEDULER_NAME.into(),
-                Arc::new(scheduler_config),
-                Arc::new(BlackholeTaskLauncher::default()),
-            ));
-
-        let session_ctx = state.session_manager.create_session(&config).await?;
-
-        let plan = test_graph(session_ctx.clone()).await;
-
-        // Create 4 jobs so we have four pending tasks
-        state
-            .task_manager
-            .queue_job("job-1", "", timestamp_millis())
-            .await?;
-        state
-            .task_manager
-            .submit_job(
-                "job-1",
-                "",
-                session_ctx.session_id().as_str(),
-                plan.clone(),
-                0,
-            )
-            .await?;
-        state
-            .task_manager
-            .queue_job("job-2", "", timestamp_millis())
-            .await?;
-        state
-            .task_manager
-            .submit_job(
-                "job-2",
-                "",
-                session_ctx.session_id().as_str(),
-                plan.clone(),
-                0,
-            )
-            .await?;
-        state
-            .task_manager
-            .queue_job("job-3", "", timestamp_millis())
-            .await?;
-        state
-            .task_manager
-            .submit_job(
-                "job-3",
-                "",
-                session_ctx.session_id().as_str(),
-                plan.clone(),
-                0,
-            )
-            .await?;
-        state
-            .task_manager
-            .queue_job("job-4", "", timestamp_millis())
-            .await?;
-        state
-            .task_manager
-            .submit_job(
-                "job-4",
-                "",
-                session_ctx.session_id().as_str(),
-                plan.clone(),
-                0,
-            )
-            .await?;
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        let reservations = state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, true)
-            .await?;
-
-        let (result, pending) = state.offer_reservation(reservations).await?;
-
-        assert_eq!(pending, 0);
-        assert!(result.is_empty());
-
-        // All task slots should be assigned so we should not be able to reserve more tasks
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 0);
-
-        Ok(())
-    }
-
-    // We should generate a new event for tasks that are still pending
-    #[tokio::test]
-    async fn test_offer_resubmit_pending() -> Result<()> {
-        let config = BallistaConfig::builder()
-            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
-            .build()?;
-
-        let scheduler_config = SchedulerConfig::default();
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_task_launcher(
-                test_cluster_context(),
-                BallistaCodec::default(),
-                TEST_SCHEDULER_NAME.into(),
-                Arc::new(scheduler_config),
-                Arc::new(BlackholeTaskLauncher::default()),
-            ));
-
-        let session_ctx = state.session_manager.create_session(&config).await?;
-
-        let plan = test_graph(session_ctx.clone()).await;
-
-        // Create a job
-        state
-            .task_manager
-            .queue_job("job-1", "", timestamp_millis())
-            .await?;
-        state
-            .task_manager
-            .submit_job(
-                "job-1",
-                "",
-                session_ctx.session_id().as_str(),
-                plan.clone(),
-                0,
-            )
-            .await?;
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        // Complete the first stage. So we should now have 4 pending tasks for this job stage 2
-        {
-            let plan_graph = state
-                .task_manager
-                .get_active_execution_graph("job-1")
-                .unwrap();
-            let task_def = plan_graph
-                .write()
-                .await
-                .pop_next_task(&executor_data.executor_id)?
-                .unwrap();
-            let mut partitions: Vec<ShuffleWritePartition> = vec![];
-            for partition_id in 0..4 {
-                partitions.push(ShuffleWritePartition {
-                    partition_id: partition_id as u64,
-                    path: "some/path".to_string(),
-                    num_batches: 1,
-                    num_rows: 1,
-                    num_bytes: 1,
-                })
-            }
-            state
-                .task_manager
-                .update_task_statuses(
-                    &executor_metadata,
-                    vec![TaskStatus {
-                        task_id: task_def.task_id as u32,
-                        job_id: "job-1".to_string(),
-                        stage_id: task_def.partition.stage_id as u32,
-                        stage_attempt_num: task_def.stage_attempt_num as u32,
-                        partition_id: task_def.partition.partition_id as u32,
-                        launch_time: 0,
-                        start_exec_time: 0,
-                        end_exec_time: 0,
-                        metrics: vec![],
-                        status: Some(task_status::Status::Successful(SuccessfulTask {
-                            executor_id: executor_data.executor_id.clone(),
-                            partitions,
-                        })),
-                    }],
-                )
-                .await?;
-        }
-
-        state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, false)
-            .await?;
-
-        let reservations = state.executor_manager.reserve_slots(1).await?;
-
-        assert_eq!(reservations.len(), 1);
-
-        // Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should
-        // be reserved for the other 3 tasks, emitting another offer event
-        let (reservations, pending) = state.offer_reservation(reservations).await?;
-
-        assert_eq!(pending, 3);
-        assert_eq!(reservations.len(), 3);
-
-        // Remaining 3 task slots should be reserved for pending tasks
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 0);
-
-        Ok(())
-    }
-
-    fn test_executors(
-        total_executors: usize,
-        slots_per_executor: u32,
-    ) -> Vec<(ExecutorMetadata, ExecutorData)> {
-        let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
-        for i in 0..total_executors {
-            result.push((
-                ExecutorMetadata {
-                    id: format!("executor-{i}"),
-                    host: format!("host-{i}"),
-                    port: 8080,
-                    grpc_port: 9090,
-                    specification: ExecutorSpecification {
-                        task_slots: slots_per_executor,
-                    },
-                },
-                ExecutorData {
-                    executor_id: format!("executor-{i}"),
-                    total_task_slots: slots_per_executor,
-                    available_task_slots: slots_per_executor,
-                },
-            ));
-        }
-
-        result
-    }
-
-    async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Utf8, false),
-            Field::new("gmv", DataType::UInt64, false),
-        ]);
-
-        let plan = scan_empty(None, &schema, Some(vec![0, 1]))
-            .unwrap()
-            .aggregate(vec![col("id")], vec![sum(col("gmv"))])
-            .unwrap()
-            .build()
-            .unwrap();
-
-        ctx.state().create_physical_plan(&plan).await.unwrap()
-    }
-}
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index 6bf245ca..67f2857b 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -20,20 +20,20 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::state::execution_graph::{
     ExecutionGraph, ExecutionStage, RunningTaskInfo, TaskDescription,
 };
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
 
 use ballista_core::error::BallistaError;
 use ballista_core::error::Result;
 
 use crate::cluster::JobState;
 use ballista_core::serde::protobuf::{
-    self, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
+    job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
 use ballista_core::serde::BallistaCodec;
 use dashmap::DashMap;
-use datafusion::physical_plan::ExecutionPlan;
 
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use log::{debug, error, info, warn};
@@ -101,19 +101,9 @@ impl TaskLauncher for DefaultTaskLauncher {
                 executor.id, tasks_ids
             );
         }
-        let mut client = executor_manager.get_client(&executor.id).await?;
-        client
-            .launch_multi_task(protobuf::LaunchMultiTaskParams {
-                multi_tasks: tasks,
-                scheduler_id: self.scheduler_id.clone(),
-            })
-            .await
-            .map_err(|e| {
-                BallistaError::Internal(format!(
-                    "Failed to connect to executor {}: {:?}",
-                    executor.id, e
-                ))
-            })?;
+        executor_manager
+            .launch_multi_task(&executor.id, tasks, self.scheduler_id.clone())
+            .await?;
         Ok(())
     }
 }
@@ -129,17 +119,21 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
 }
 
 #[derive(Clone)]
-struct JobInfoCache {
+pub struct JobInfoCache {
     // Cache for active execution graphs curated by this scheduler
-    execution_graph: Arc<RwLock<ExecutionGraph>>,
+    pub execution_graph: Arc<RwLock<ExecutionGraph>>,
+    // Cache for job status
+    pub status: Option<job_status::Status>,
     // Cache for encoded execution stage plan to avoid duplicated encoding for multiple tasks
     encoded_stage_plans: HashMap<usize, Vec<u8>>,
 }
 
 impl JobInfoCache {
-    fn new(graph: ExecutionGraph) -> Self {
+    pub fn new(graph: ExecutionGraph) -> Self {
+        let status = graph.status().status.clone();
         Self {
             execution_graph: Arc::new(RwLock::new(graph)),
+            status,
             encoded_stage_plans: HashMap::new(),
         }
     }
@@ -186,13 +180,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
     }
 
     /// Enqueue a job for scheduling
-    pub async fn queue_job(
-        &self,
-        job_id: &str,
-        job_name: &str,
-        queued_at: u64,
-    ) -> Result<()> {
-        self.state.accept_job(job_id, job_name, queued_at).await
+    pub fn queue_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
+        self.state.accept_job(job_id, job_name, queued_at)
+    }
+
+    /// Get the number of queued jobs. If it's big, then it means the scheduler is too busy.
+    /// In normal case, it's better to be 0.
+    pub fn pending_job_number(&self) -> usize {
+        self.state.pending_job_number()
+    }
+
+    /// Get the number of running jobs.
+    pub fn running_job_number(&self) -> usize {
+        self.active_job_cache.len()
     }
 
     /// Generate an ExecutionGraph for the job and save it to the persistent state.
@@ -225,6 +225,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         Ok(())
     }
 
+    pub fn get_running_job_cache(&self) -> Arc<HashMap<String, JobInfoCache>> {
+        let ret = self
+            .active_job_cache
+            .iter()
+            .filter_map(|pair| {
+                let (job_id, job_info) = pair.pair();
+                if matches!(job_info.status, Some(job_status::Status::Running(_))) {
+                    Some((job_id.clone(), job_info.clone()))
+                } else {
+                    None
+                }
+            })
+            .collect::<HashMap<_, _>>();
+        Arc::new(ret)
+    }
+
     /// Get a list of active job ids
     pub async fn get_jobs(&self) -> Result<Vec<JobOverview>> {
         let job_ids = self.state.get_jobs().await?;
@@ -251,7 +267,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         if let Some(graph) = self.get_active_execution_graph(job_id) {
             let guard = graph.read().await;
 
-            Ok(Some(guard.status()))
+            Ok(Some(guard.status().clone()))
         } else {
             self.state.get_job_status(job_id).await
         }
@@ -320,61 +336,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         Ok(events)
     }
 
-    /// Take a list of executor reservations and fill them with tasks that are ready
-    /// to be scheduled.
-    ///
-    /// Here we use the following  algorithm:
-    ///
-    /// 1. For each free reservation, try to assign a task from one of the active jobs
-    /// 2. If we cannot find a task in all active jobs, then add the reservation to the list of unassigned reservations
-    ///
-    /// Finally, we return:
-    /// 1. A list of assignments which is a (Executor ID, Task) tuple
-    /// 2. A list of unassigned reservations which we could not find tasks for
-    /// 3. The number of pending tasks across active jobs
-    pub async fn fill_reservations(
-        &self,
-        reservations: &[ExecutorReservation],
-    ) -> Result<(
-        Vec<(String, TaskDescription)>,
-        Vec<ExecutorReservation>,
-        usize,
-    )> {
-        // Reinitialize the free reservations.
-        let free_reservations: Vec<ExecutorReservation> = reservations
-            .iter()
-            .map(|reservation| {
-                ExecutorReservation::new_free(reservation.executor_id.clone())
-            })
-            .collect();
-
-        let mut assignments: Vec<(String, TaskDescription)> = vec![];
-        let mut pending_tasks = 0usize;
-        let mut assign_tasks = 0usize;
-        for pairs in self.active_job_cache.iter() {
-            let (_job_id, job_info) = pairs.pair();
-            let mut graph = job_info.execution_graph.write().await;
-            for reservation in free_reservations.iter().skip(assign_tasks) {
-                if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
-                    assignments.push((reservation.executor_id.clone(), task));
-                    assign_tasks += 1;
-                } else {
-                    break;
-                }
-            }
-            if assign_tasks >= free_reservations.len() {
-                pending_tasks += graph.available_tasks();
-                break;
-            }
-        }
-
-        let mut unassigned = vec![];
-        for reservation in free_reservations.iter().skip(assign_tasks) {
-            unassigned.push(reservation.clone());
-        }
-        Ok((assignments, unassigned, pending_tasks))
-    }
-
     /// Mark a job to success. This will create a key under the CompletedJobs keyspace
     /// and remove the job from ActiveJobs
     pub(crate) async fn succeed_job(&self, job_id: &str) -> Result<()> {
@@ -410,7 +371,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         failure_reason: String,
     ) -> Result<(Vec<RunningTaskInfo>, usize)> {
         let (tasks_to_cancel, pending_tasks) = if let Some(graph) =
-            self.get_active_execution_graph(job_id)
+            self.remove_active_execution_graph(job_id)
         {
             let mut guard = graph.write().await;
 
@@ -717,7 +678,7 @@ impl From<&ExecutionGraph> for JobOverview {
         Self {
             job_id: value.job_id().to_string(),
             job_name: value.job_name().to_string(),
-            status: value.status(),
+            status: value.status().clone(),
             start_time: value.start_time(),
             end_time: value.end_time(),
             num_stages: value.stage_count(),
diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs
index 1821c729..fe274688 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -55,7 +55,6 @@ use datafusion::test_util::scan_empty;
 use crate::cluster::BallistaCluster;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 use crate::state::execution_graph::{ExecutionGraph, TaskDescription};
 use ballista_core::utils::default_session_builder;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
@@ -451,7 +450,7 @@ impl SchedulerTest {
             scheduler
                 .state
                 .executor_manager
-                .register_executor(metadata, executor_data, false)
+                .register_executor(metadata, executor_data)
                 .await?;
         }
 
@@ -462,8 +461,12 @@ impl SchedulerTest {
         })
     }
 
-    pub fn pending_tasks(&self) -> usize {
-        self.scheduler.pending_tasks()
+    pub fn pending_job_number(&self) -> usize {
+        self.scheduler.pending_job_number()
+    }
+
+    pub fn running_job_number(&self) -> usize {
+        self.scheduler.running_job_number()
     }
 
     pub async fn ctx(&self) -> Result<Arc<SessionContext>> {
@@ -655,12 +658,6 @@ impl SchedulerTest {
 
         final_status
     }
-
-    pub(crate) fn query_stage_scheduler(
-        &self,
-    ) -> Arc<QueryStageScheduler<LogicalPlanNode, PhysicalPlanNode>> {
-        self.scheduler.query_stage_scheduler()
-    }
 }
 
 #[derive(Clone)]
@@ -786,6 +783,13 @@ pub fn assert_failed_event(job_id: &str, collector: &TestMetricsCollector) {
 }
 
 pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
+    test_aggregation_plan_with_job_id(partition, "job").await
+}
+
+pub async fn test_aggregation_plan_with_job_id(
+    partition: usize,
+    job_id: &str,
+) -> ExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::with_config(config));
     let session_state = ctx.state();
@@ -811,7 +815,7 @@ pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
 
     println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-    ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 0).unwrap()
+    ExecutionGraph::new("localhost:50050", job_id, "", "session", plan, 0).unwrap()
 }
 
 pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {