You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/16 11:26:05 UTC

[arrow-datafusion] branch master updated: Refactor scheduler state with different management policy for volatile and stable states (#1810)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 407adc0  Refactor scheduler state with different management policy for volatile and stable states (#1810)
407adc0 is described below

commit 407adc06cc33f075b72bd8197fca4b34ee623390
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Feb 16 19:25:30 2022 +0800

    Refactor scheduler state with different management policy for volatile and stable states (#1810)
    
    * Refactor ballista state info
    
    Cache volatile state just in memory without storing them in db
    
    Fix ut
    
    Keep volatile state just in memory rather than store them in db
    
    Cache stable state in memory
    
    Fix ut
    
    Fix for mingmwang's comments
    
    Rename partition_id to task_id in protobuf::TaskStatus
    
    Rename the names of the in-memory structs
    
    * Enhance error handling
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/rust/core/proto/ballista.proto       |   19 +-
 ballista/rust/core/src/serde/scheduler/mod.rs |   18 +-
 ballista/rust/executor/src/execution_loop.rs  |   20 +-
 ballista/rust/executor/src/executor.rs        |   24 +-
 ballista/rust/executor/src/executor_server.rs |   69 +-
 ballista/rust/executor/src/lib.rs             |    4 +-
 ballista/rust/executor/src/main.rs            |   16 +-
 ballista/rust/executor/src/standalone.rs      |   49 +-
 ballista/rust/scheduler/src/lib.rs            |  340 ++++----
 ballista/rust/scheduler/src/standalone.rs     |   23 +-
 ballista/rust/scheduler/src/state/mod.rs      | 1059 ++++++++++++++++---------
 11 files changed, 955 insertions(+), 686 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 32e363d..a0925b4 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -848,13 +848,16 @@ message ColumnStats {
   uint32 distinct_count = 4;
 }
 
+// Used by scheduler
 message ExecutorMetadata {
   string id = 1;
   string host = 2;
   uint32 port = 3;
   uint32 grpc_port = 4;
+  ExecutorSpecification specification = 5;
 }
 
+// Used by grpc
 message ExecutorRegistration {
   string id = 1;
   // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
@@ -864,10 +867,11 @@ message ExecutorRegistration {
   }
   uint32 port = 3;
   uint32 grpc_port = 4;
+  ExecutorSpecification specification = 5;
 }
 
 message ExecutorHeartbeat {
-  ExecutorMetadata meta = 1;
+  string executor_id = 1;
   // Unix epoch-based timestamp in seconds
   uint64 timestamp = 2;
   ExecutorState state = 3;
@@ -929,7 +933,7 @@ message ShuffleWritePartition {
 }
 
 message TaskStatus {
-  PartitionId partition_id = 1;
+  PartitionId task_id = 1;
   oneof status {
     RunningTask running = 2;
     FailedTask failed = 3;
@@ -957,19 +961,18 @@ message PollWorkResult {
 
 message RegisterExecutorParams {
   ExecutorRegistration metadata = 1;
-  ExecutorSpecification specification = 2;
 }
 
 message RegisterExecutorResult {
   bool success = 1;
 }
 
-message SendHeartBeatParams {
-  ExecutorRegistration metadata = 1;
+message HeartBeatParams {
+  string executor_id = 1;
   ExecutorState state = 2;
 }
 
-message SendHeartBeatResult {
+message HeartBeatResult {
   // TODO it's from Spark for BlockManager
   bool reregister = 1;
 }
@@ -981,7 +984,7 @@ message StopExecutorResult {
 }
 
 message UpdateTaskStatusParams {
-  ExecutorRegistration metadata = 1;
+  string executor_id = 1;
   // All tasks must be reported until they reach the failed or completed state
   repeated TaskStatus task_status = 2;
 }
@@ -1067,7 +1070,7 @@ service SchedulerGrpc {
 
   // Push-based task scheduler will only leverage this interface
   // rather than the PollWork interface to report executor states
-  rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}
+  rpc HeartBeatFromExecutor (HeartBeatParams) returns (HeartBeatResult) {}
 
   rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}
 
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs
index 43438e2..a03811c 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -66,43 +66,47 @@ impl PartitionId {
 #[derive(Debug, Clone)]
 pub struct PartitionLocation {
     pub partition_id: PartitionId,
-    pub executor_meta: ExecutorMeta,
+    pub executor_meta: ExecutorMetadata,
     pub partition_stats: PartitionStats,
     pub path: String,
 }
 
 /// Meta-data for an executor, used when fetching shuffle partitions from other executors
 #[derive(Debug, Clone, PartialEq, Eq, Serialize)]
-pub struct ExecutorMeta {
+pub struct ExecutorMetadata {
     pub id: String,
     pub host: String,
     pub port: u16,
     pub grpc_port: u16,
+    pub specification: ExecutorSpecification,
 }
 
 #[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
+impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
     fn into(self) -> protobuf::ExecutorMetadata {
         protobuf::ExecutorMetadata {
             id: self.id,
             host: self.host,
             port: self.port as u32,
             grpc_port: self.grpc_port as u32,
+            specification: Some(self.specification.into()),
         }
     }
 }
 
-impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
+impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
     fn from(meta: protobuf::ExecutorMetadata) -> Self {
         Self {
             id: meta.id,
             host: meta.host,
             port: meta.port as u16,
             grpc_port: meta.grpc_port as u16,
+            specification: meta.specification.unwrap().into(),
         }
     }
 }
 
+/// Specification of an executor, indicting executor resources, like total task slots
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 pub struct ExecutorSpecification {
     pub task_slots: u32,
@@ -136,6 +140,7 @@ impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
     }
 }
 
+/// From Spark, available resources for an executor, like available task slots
 #[derive(Debug, Clone, Serialize)]
 pub struct ExecutorData {
     pub executor_id: String,
@@ -204,6 +209,7 @@ impl From<protobuf::ExecutorData> for ExecutorData {
     }
 }
 
+/// The internal state of an executor, like cpu usage, memory usage, etc
 #[derive(Debug, Clone, Copy, Serialize)]
 pub struct ExecutorState {
     // in bytes
@@ -359,7 +365,7 @@ pub struct ExecutePartition {
     /// The physical plan for this query stage
     pub plan: Arc<dyn ExecutionPlan>,
     /// Location of shuffle partitions that this query stage may depend on
-    pub shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
+    pub shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
     /// Output partitioning for shuffle writes
     pub output_partitioning: Option<Partitioning>,
 }
@@ -370,7 +376,7 @@ impl ExecutePartition {
         stage_id: usize,
         partition_id: Vec<usize>,
         plan: Arc<dyn ExecutionPlan>,
-        shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
+        shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
         output_partitioning: Option<Partitioning>,
     ) -> Self {
         Self {
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index a324ba4..ddb2c97 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -23,7 +23,6 @@ use datafusion::physical_plan::ExecutionPlan;
 use log::{debug, error, info, warn};
 use tonic::transport::Channel;
 
-use ballista_core::serde::protobuf::ExecutorRegistration;
 use ballista_core::serde::protobuf::{
     scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
     TaskDefinition, TaskStatus,
@@ -33,16 +32,23 @@ use crate::as_task_status;
 use crate::executor::Executor;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
 
 pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     mut scheduler: SchedulerGrpcClient<Channel>,
     executor: Arc<Executor>,
-    executor_meta: ExecutorRegistration,
-    concurrent_tasks: usize,
     codec: BallistaCodec<T, U>,
 ) {
-    let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks));
+    let executor_specification: ExecutorSpecification = executor
+        .metadata
+        .specification
+        .as_ref()
+        .unwrap()
+        .clone()
+        .into();
+    let available_tasks_slots =
+        Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
     let (task_status_sender, mut task_status_receiver) =
         std::sync::mpsc::channel::<TaskStatus>();
 
@@ -61,7 +67,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
             tonic::Status,
         > = scheduler
             .poll_work(PollWorkParams {
-                metadata: Some(executor_meta.clone()),
+                metadata: Some(executor.metadata.clone()),
                 can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0,
                 task_status,
             })
@@ -74,7 +80,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 if let Some(task) = result.into_inner().task {
                     match run_received_tasks(
                         executor.clone(),
-                        executor_meta.id.clone(),
                         available_tasks_slots.clone(),
                         task_status_sender,
                         task,
@@ -106,7 +111,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
 
 async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     executor: Arc<Executor>,
-    executor_id: String,
     available_tasks_slots: Arc<AtomicUsize>,
     task_status_sender: Sender<TaskStatus>,
     task: TaskDefinition,
@@ -146,7 +150,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
         available_tasks_slots.fetch_add(1, Ordering::SeqCst);
         let _ = task_status_sender.send(as_task_status(
             execution_result,
-            executor_id,
+            executor.metadata.id.clone(),
             task_id,
         ));
     });
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index 5902897..def5b28 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use ballista_core::error::BallistaError;
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::protobuf;
-use ballista_core::serde::scheduler::ExecutorSpecification;
+use ballista_core::serde::protobuf::ExecutorRegistration;
 use datafusion::error::DataFusionError;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -31,11 +31,11 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};
 
 /// Ballista executor
 pub struct Executor {
-    /// Directory for storing partial results
-    work_dir: String,
+    /// Metadata
+    pub metadata: ExecutorRegistration,
 
-    /// Specification like total task slots
-    pub specification: ExecutorSpecification,
+    /// Directory for storing partial results
+    pub work_dir: String,
 
     /// DataFusion execution context
     pub ctx: Arc<ExecutionContext>,
@@ -43,22 +43,14 @@ pub struct Executor {
 
 impl Executor {
     /// Create a new executor instance
-    pub fn new(work_dir: &str, ctx: Arc<ExecutionContext>) -> Self {
-        Executor::new_with_specification(
-            work_dir,
-            ExecutorSpecification { task_slots: 4 },
-            ctx,
-        )
-    }
-
-    pub fn new_with_specification(
+    pub fn new(
+        metadata: ExecutorRegistration,
         work_dir: &str,
-        specification: ExecutorSpecification,
         ctx: Arc<ExecutionContext>,
     ) -> Self {
         Self {
+            metadata,
             work_dir: work_dir.to_owned(),
-            specification,
             ctx,
         }
     }
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index 7e4c9a1..0c065c3 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
 
-use log::{debug, info};
+use log::{debug, error, info};
 use tonic::transport::{Channel, Server};
 use tonic::{Request, Response, Status};
 
@@ -31,11 +31,10 @@ use ballista_core::serde::protobuf::executor_grpc_server::{
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
 use ballista_core::serde::protobuf::{
-    ExecutorRegistration, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
-    SendHeartBeatParams, StopExecutorParams, StopExecutorResult, TaskDefinition,
-    UpdateTaskStatusParams,
+    HeartBeatParams, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
+    StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
 };
-use ballista_core::serde::scheduler::{ExecutorSpecification, ExecutorState};
+use ballista_core::serde::scheduler::ExecutorState;
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
 use datafusion::physical_plan::ExecutionPlan;
 
@@ -45,7 +44,6 @@ use crate::executor::Executor;
 pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     mut scheduler: SchedulerGrpcClient<Channel>,
     executor: Arc<Executor>,
-    executor_meta: ExecutorRegistration,
     codec: BallistaCodec<T, U>,
 ) {
     // TODO make the buffer size configurable
@@ -54,14 +52,13 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     let executor_server = ExecutorServer::new(
         scheduler.clone(),
         executor.clone(),
-        executor_meta.clone(),
         ExecutorEnv { tx_task },
         codec,
     );
 
     // 1. Start executor grpc service
     {
-        let executor_meta = executor_meta.clone();
+        let executor_meta = executor.metadata.clone();
         let addr = format!(
             "{}:{}",
             executor_meta
@@ -83,8 +80,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     let executor_server = Arc::new(executor_server);
 
     // 2. Do executor registration
-    match register_executor(&mut scheduler, &executor_meta, &executor.specification).await
-    {
+    match register_executor(&mut scheduler, executor.clone()).await {
         Ok(_) => {
             info!("Executor registration succeed");
         }
@@ -109,13 +105,11 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
 #[allow(clippy::clone_on_copy)]
 async fn register_executor(
     scheduler: &mut SchedulerGrpcClient<Channel>,
-    executor_meta: &ExecutorRegistration,
-    specification: &ExecutorSpecification,
+    executor: Arc<Executor>,
 ) -> Result<(), BallistaError> {
     let result = scheduler
         .register_executor(RegisterExecutorParams {
-            metadata: Some(executor_meta.clone()),
-            specification: Some(specification.clone().into()),
+            metadata: Some(executor.metadata.clone()),
         })
         .await?;
     if result.into_inner().success {
@@ -131,7 +125,6 @@ async fn register_executor(
 pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
     _start_time: u128,
     executor: Arc<Executor>,
-    executor_meta: ExecutorRegistration,
     scheduler: SchedulerGrpcClient<Channel>,
     executor_env: ExecutorEnv,
     codec: BallistaCodec<T, U>,
@@ -148,7 +141,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
     fn new(
         scheduler: SchedulerGrpcClient<Channel>,
         executor: Arc<Executor>,
-        executor_meta: ExecutorRegistration,
         executor_env: ExecutorEnv,
         codec: BallistaCodec<T, U>,
     ) -> Self {
@@ -158,7 +150,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
                 .unwrap()
                 .as_millis(),
             executor,
-            executor_meta,
             scheduler,
             executor_env,
             codec,
@@ -169,9 +160,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         // TODO Error handling
         self.scheduler
             .clone()
-            .send_heart_beat(SendHeartBeatParams {
-                metadata: Some(self.executor_meta.clone()),
-                state: Some(self.get_executor_state().await.into()),
+            .heart_beat_from_executor(HeartBeatParams {
+                executor_id: self.executor.metadata.id.clone(),
+                state: Some(self.get_executor_state().into()),
             })
             .await
             .unwrap();
@@ -211,14 +202,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         info!("Done with task {}", task_id_log);
         debug!("Statistics: {:?}", execution_result);
 
+        let executor_id = &self.executor.metadata.id;
         // TODO use another channel to update the status of a task set
         self.scheduler
             .clone()
             .update_task_status(UpdateTaskStatusParams {
-                metadata: Some(self.executor_meta.clone()),
+                executor_id: executor_id.clone(),
                 task_status: vec![as_task_status(
                     execution_result,
-                    self.executor_meta.id.clone(),
+                    executor_id.clone(),
                     task_id,
                 )],
             })
@@ -228,7 +220,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
     }
 
     // TODO with real state
-    async fn get_executor_state(&self) -> ExecutorState {
+    fn get_executor_state(&self) -> ExecutorState {
         ExecutorState {
             available_memory_size: u64::MAX,
         }
@@ -270,13 +262,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         tokio::spawn(async move {
             info!("Starting the task runner pool");
             loop {
-                let task = rx_task.recv().await.unwrap();
-                info!("Received task {:?}", task);
-
-                let server = executor_server.clone();
-                tokio::spawn(async move {
-                    server.run_task(task).await.unwrap();
-                });
+                if let Some(task) = rx_task.recv().await {
+                    if let Some(task_id) = &task.task_id {
+                        let task_id_log = format!(
+                            "{}/{}/{}",
+                            task_id.job_id, task_id.stage_id, task_id.partition_id
+                        );
+                        info!("Received task {:?}", &task_id_log);
+
+                        let server = executor_server.clone();
+                        tokio::spawn(async move {
+                            server.run_task(task).await.unwrap_or_else(|e| {
+                                error!(
+                                    "Fail to run the task {:?} due to {:?}",
+                                    task_id_log, e
+                                );
+                            });
+                        });
+                    } else {
+                        error!("There's no task id in the task definition {:?}", task);
+                    }
+                } else {
+                    info!("Channel is closed and will exit the loop");
+                    return;
+                }
             }
         });
     }
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index a2711da..6e516d9 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -43,7 +43,7 @@ pub fn as_task_status(
             info!("Task {:?} finished", task_id);
 
             TaskStatus {
-                partition_id: Some(task_id),
+                task_id: Some(task_id),
                 status: Some(task_status::Status::Completed(CompletedTask {
                     executor_id,
                     partitions,
@@ -55,7 +55,7 @@ pub fn as_task_status(
             info!("Task {:?} failed: {}", task_id, error_msg);
 
             TaskStatus {
-                partition_id: Some(task_id),
+                task_id: Some(task_id),
                 status: Some(task_status::Status::Failed(FailedTask {
                     error: format!("Task failed due to Tokio error: {}", error_msg),
                 })),
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 3440f82..39c94b0 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -100,13 +100,16 @@ async fn main() -> Result<()> {
             .map(executor_registration::OptionalHost::Host),
         port: port as u32,
         grpc_port: grpc_port as u32,
+        specification: Some(
+            ExecutorSpecification {
+                task_slots: opt.concurrent_tasks as u32,
+            }
+            .into(),
+        ),
     };
-    let executor_specification = ExecutorSpecification {
-        task_slots: opt.concurrent_tasks as u32,
-    };
-    let executor: Arc<Executor> = Arc::new(Executor::new_with_specification(
+    let executor = Arc::new(Executor::new(
+        executor_meta,
         &work_dir,
-        executor_specification,
         Arc::new(ExecutionContext::new()),
     ));
 
@@ -123,7 +126,6 @@ async fn main() -> Result<()> {
             tokio::spawn(executor_server::startup(
                 scheduler,
                 executor.clone(),
-                executor_meta,
                 default_codec,
             ));
         }
@@ -131,8 +133,6 @@ async fn main() -> Result<()> {
             tokio::spawn(execution_loop::poll_loop(
                 scheduler,
                 executor.clone(),
-                executor_meta,
-                opt.concurrent_tasks,
                 default_codec,
             ));
         }
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index fa5f06e..0bc2503 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
 
 use arrow_flight::flight_service_server::FlightServiceServer;
 
+use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
 use ballista_core::{
     error::Result,
@@ -43,17 +44,6 @@ pub async fn new_standalone_executor<
     concurrent_tasks: usize,
     codec: BallistaCodec<T, U>,
 ) -> Result<()> {
-    let work_dir = TempDir::new()?
-        .into_path()
-        .into_os_string()
-        .into_string()
-        .unwrap();
-    let ctx = Arc::new(ExecutionContext::new());
-    let executor: Arc<Executor> = Arc::new(Executor::new(&work_dir, ctx));
-
-    let service = BallistaFlightService::new(executor.clone());
-
-    let server = FlightServiceServer::new(service);
     // Let the OS assign a random, free port
     let listener = TcpListener::bind("localhost:0").await?;
     let addr = listener.local_addr()?;
@@ -61,24 +51,37 @@ pub async fn new_standalone_executor<
         "Ballista v{} Rust Executor listening on {:?}",
         BALLISTA_VERSION, addr
     );
-    tokio::spawn(
-        Server::builder().add_service(server).serve_with_incoming(
-            tokio_stream::wrappers::TcpListenerStream::new(listener),
-        ),
-    );
+
     let executor_meta = ExecutorRegistration {
         id: Uuid::new_v4().to_string(), // assign this executor a unique ID
         optional_host: Some(OptionalHost::Host("localhost".to_string())),
         port: addr.port() as u32,
         // TODO Make it configurable
         grpc_port: 50020,
+        specification: Some(
+            ExecutorSpecification {
+                task_slots: concurrent_tasks as u32,
+            }
+            .into(),
+        ),
     };
-    tokio::spawn(execution_loop::poll_loop(
-        scheduler,
-        executor,
-        executor_meta,
-        concurrent_tasks,
-        codec,
-    ));
+    let work_dir = TempDir::new()?
+        .into_path()
+        .into_os_string()
+        .into_string()
+        .unwrap();
+    info!("work_dir: {}", work_dir);
+    let ctx = Arc::new(ExecutionContext::new());
+    let executor = Arc::new(Executor::new(executor_meta, &work_dir, ctx));
+
+    let service = BallistaFlightService::new(executor.clone());
+    let server = FlightServiceServer::new(service);
+    tokio::spawn(
+        Server::builder().add_service(server).serve_with_incoming(
+            tokio_stream::wrappers::TcpListenerStream::new(listener),
+        ),
+    );
+
+    tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
     Ok(())
 }
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index 3459cce..2c202b5 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -49,16 +49,14 @@ use std::{convert::TryInto, sync::Arc};
 use ballista_core::serde::protobuf::{
     execute_query_params::Query, executor_registration::OptionalHost, job_status,
     scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams,
-    ExecuteQueryResult, FailedJob, FileType, GetFileMetadataParams,
-    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
-    LaunchTaskParams, PartitionId, PollWorkParams, PollWorkResult, QueuedJob,
-    RegisterExecutorParams, RegisterExecutorResult, RunningJob, SendHeartBeatParams,
-    SendHeartBeatResult, TaskDefinition, TaskStatus, UpdateTaskStatusParams,
+    ExecuteQueryResult, ExecutorHeartbeat, FailedJob, FileType, GetFileMetadataParams,
+    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+    HeartBeatResult, JobStatus, LaunchTaskParams, PartitionId, PollWorkParams,
+    PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult,
+    RunningJob, TaskDefinition, TaskStatus, UpdateTaskStatusParams,
     UpdateTaskStatusResult,
 };
-use ballista_core::serde::scheduler::{
-    ExecutorData, ExecutorMeta, ExecutorSpecification,
-};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 
 use clap::ArgEnum;
 use datafusion::physical_plan::ExecutionPlan;
@@ -111,13 +109,15 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 use tokio::sync::{mpsc, RwLock};
 use tonic::transport::Channel;
 
+type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
+
 #[derive(Clone)]
 pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
     pub(crate) state: Arc<SchedulerState<T, U>>,
     start_time: u128,
     policy: TaskSchedulingPolicy,
     scheduler_env: Option<SchedulerEnv>,
-    executors_client: Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>,
+    executors_client: Option<ExecutorsClient>,
     ctx: Arc<RwLock<ExecutionContext>>,
     codec: BallistaCodec<T, U>,
 }
@@ -153,11 +153,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         codec: BallistaCodec<T, U>,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(config, namespace, codec.clone()));
-        let state_clone = state.clone();
-
-        // TODO: we should elect a leader in the scheduler cluster and run this only in the leader
-        tokio::spawn(async move { state_clone.synchronize_job_status_loop().await });
 
+        let executors_client = if matches!(policy, TaskSchedulingPolicy::PushStaged) {
+            Some(Arc::new(RwLock::new(HashMap::new())))
+        } else {
+            None
+        };
         Self {
             state,
             start_time: SystemTime::now()
@@ -166,36 +167,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                 .as_millis(),
             policy,
             scheduler_env,
-            executors_client: Arc::new(RwLock::new(HashMap::new())),
+            executors_client,
             ctx,
             codec,
         }
     }
 
+    pub async fn init(&self) -> Result<(), BallistaError> {
+        let ctx = self.ctx.read().await;
+        self.state.init(&ctx).await?;
+
+        Ok(())
+    }
+
     async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> {
-        let alive_executors = self
-            .state
-            .get_alive_executors_metadata_within_one_minute()
-            .await?;
-        let alive_executors: HashMap<String, ExecutorMeta> = alive_executors
-            .into_iter()
-            .map(|e| (e.id.clone(), e))
-            .collect();
-        let available_executors = self.state.get_available_executors_data().await?;
-        let mut available_executors: Vec<ExecutorData> = available_executors
-            .into_iter()
-            .filter(|e| alive_executors.contains_key(&e.executor_id))
-            .collect();
+        let mut available_executors = self.state.get_available_executors_data();
 
         // In case of there's no enough resources, reschedule the tasks of the job
         if available_executors.is_empty() {
             let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
             // TODO Maybe it's better to use an exclusive runtime for this kind task scheduling
-            tokio::spawn(async move {
-                warn!("Not enough available executors for task running");
-                tokio::time::sleep(Duration::from_millis(100)).await;
-                tx_job.send(job_id).await.unwrap();
-            });
+            warn!("Not enough available executors for task running");
+            tokio::time::sleep(Duration::from_millis(100)).await;
+            tx_job.send(job_id).await.unwrap();
             return Ok(());
         }
 
@@ -210,12 +204,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                         tasks, executor_data.executor_id
                     );
                     let mut client = {
-                        let clients = self.executors_client.read().await;
+                        let clients =
+                            self.executors_client.as_ref().unwrap().read().await;
                         info!("Size of executor clients: {:?}", clients.len());
                         clients.get(&executor_data.executor_id).unwrap().clone()
                     };
                     // Update the resources first
-                    self.state.save_executor_data(executor_data.clone()).await?;
+                    self.state.save_executor_data(executor_data.clone());
                     // TODO check whether launching task is successful or not
                     client.launch_task(LaunchTaskParams { task: tasks }).await?;
                 } else {
@@ -241,7 +236,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             ret.push(Vec::new());
         }
         let mut num_tasks = 0;
-        let ctx = self.ctx.read().await;
         loop {
             info!("Go inside fetching task loop");
             let mut has_tasks = true;
@@ -251,7 +245,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                 }
                 let plan = self
                     .state
-                    .assign_next_schedulable_job_task(&executor.executor_id, job_id, &ctx)
+                    .assign_next_schedulable_job_task(&executor.executor_id, job_id)
                     .await
                     .map_err(|e| {
                         let msg = format!("Error finding next assignable task: {}", e);
@@ -259,13 +253,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                         tonic::Status::internal(msg)
                     })?;
                 if let Some((task, _plan)) = &plan {
-                    let partition_id = task.partition_id.as_ref().unwrap();
+                    let task_id = task.task_id.as_ref().unwrap();
                     info!(
                         "Sending new task to {}: {}/{}/{}",
                         executor.executor_id,
-                        partition_id.job_id,
-                        partition_id.stage_id,
-                        partition_id.partition_id
+                        task_id.job_id,
+                        task_id.stage_id,
+                        task_id.partition_id
                     );
                 }
                 match plan {
@@ -297,7 +291,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
 
                         ret[idx].push(TaskDefinition {
                             plan: buf,
-                            task_id: status.partition_id,
+                            task_id: status.task_id,
                             output_partitioning: hash_partitioning_to_proto(
                                 output_partitioning,
                             )
@@ -366,12 +360,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
         &self,
         _request: Request<ScaledObjectRef>,
     ) -> Result<Response<IsActiveResponse>, tonic::Status> {
-        let tasks = self.state.get_all_tasks().await.map_err(|e| {
-            let msg = format!("Error reading tasks: {}", e);
-            error!("{}", msg);
-            tonic::Status::internal(msg)
-        })?;
-        let result = tasks.iter().any(|(_key, task)| {
+        let tasks = self.state.get_all_tasks();
+        let result = tasks.iter().any(|task| {
             !matches!(
                 task.status,
                 Some(task_status::Status::Completed(_))
@@ -429,7 +419,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         } = request.into_inner()
         {
             debug!("Received poll_work request for {:?}", metadata);
-            let metadata: ExecutorMeta = ExecutorMeta {
+            let metadata = ExecutorMetadata {
                 id: metadata.id,
                 host: metadata
                     .optional_host
@@ -439,20 +429,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
                 port: metadata.port as u16,
                 grpc_port: metadata.grpc_port as u16,
+                specification: metadata.specification.unwrap().into(),
             };
-            let mut lock = self.state.lock().await.map_err(|e| {
-                let msg = format!("Could not lock the state: {}", e);
-                error!("{}", msg);
-                tonic::Status::internal(msg)
-            })?;
-            self.state
-                .save_executor_metadata(metadata.clone())
-                .await
-                .map_err(|e| {
-                    let msg = format!("Could not save executor metadata: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
+            let executor_heartbeat = ExecutorHeartbeat {
+                executor_id: metadata.id.clone(),
+                timestamp: SystemTime::now()
+                    .duration_since(UNIX_EPOCH)
+                    .expect("Time went backwards")
+                    .as_secs(),
+                state: None,
+            };
+            // In case that it's the first time to poll work, do registration
+            if let Some(_executor_meta) = self.state.get_executor_metadata(&metadata.id) {
+            } else {
+                self.state
+                    .save_executor_metadata(metadata.clone())
+                    .await
+                    .map_err(|e| {
+                        let msg = format!("Could not save executor metadata: {}", e);
+                        error!("{}", msg);
+                        tonic::Status::internal(msg)
+                    })?;
+            }
+            self.state.save_executor_heartbeat(executor_heartbeat);
             for task_status in task_status {
                 self.state
                     .save_task_status(&task_status)
@@ -464,10 +463,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     })?;
             }
             let task: Result<Option<_>, Status> = if can_accept_task {
-                let ctx = self.ctx.read().await;
                 let plan = self
                     .state
-                    .assign_next_schedulable_task(&metadata.id, &ctx)
+                    .assign_next_schedulable_task(&metadata.id)
                     .await
                     .map_err(|e| {
                         let msg = format!("Error finding next assignable task: {}", e);
@@ -475,13 +473,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                         tonic::Status::internal(msg)
                     })?;
                 if let Some((task, _plan)) = &plan {
-                    let partition_id = task.partition_id.as_ref().unwrap();
+                    let task_id = task.task_id.as_ref().unwrap();
                     info!(
                         "Sending new task to {}: {}/{}/{}",
                         metadata.id,
-                        partition_id.job_id,
-                        partition_id.stage_id,
-                        partition_id.partition_id
+                        task_id.job_id,
+                        task_id.stage_id,
+                        task_id.partition_id
                     );
                 }
                 match plan {
@@ -511,7 +509,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                         })?;
                         Ok(Some(TaskDefinition {
                             plan: buf,
-                            task_id: status.partition_id,
+                            task_id: status.task_id,
                             output_partitioning: hash_partitioning_to_proto(
                                 output_partitioning,
                             )
@@ -523,7 +521,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             } else {
                 Ok(None)
             };
-            lock.unlock().await;
             Ok(Response::new(PollWorkResult { task: task? }))
         } else {
             warn!("Received invalid executor poll_work request");
@@ -540,11 +537,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         let remote_addr = request.remote_addr();
         if let RegisterExecutorParams {
             metadata: Some(metadata),
-            specification: Some(specification),
         } = request.into_inner()
         {
             info!("Received register executor request for {:?}", metadata);
-            let metadata: ExecutorMeta = ExecutorMeta {
+            let metadata = ExecutorMetadata {
                 id: metadata.id,
                 host: metadata
                     .optional_host
@@ -554,6 +550,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
                 port: metadata.port as u16,
                 grpc_port: metadata.grpc_port as u16,
+                specification: metadata.specification.unwrap().into(),
             };
             // Check whether the executor starts the grpc service
             {
@@ -564,16 +561,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     .await
                     .context("Could not connect to executor")
                     .map_err(|e| tonic::Status::internal(format!("{:?}", e)))?;
-                let mut clients = self.executors_client.write().await;
+                let mut clients = self.executors_client.as_ref().unwrap().write().await;
                 // TODO check duplicated registration
                 clients.insert(metadata.id.clone(), executor_client);
                 info!("Size of executor clients: {:?}", clients.len());
             }
-            let mut lock = self.state.lock().await.map_err(|e| {
-                let msg = format!("Could not lock the state: {}", e);
-                error!("{}", msg);
-                tonic::Status::internal(msg)
-            })?;
             self.state
                 .save_executor_metadata(metadata.clone())
                 .await
@@ -582,21 +574,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     error!("{}", msg);
                     tonic::Status::internal(msg)
                 })?;
-            let executor_spec: ExecutorSpecification = specification.into();
             let executor_data = ExecutorData {
                 executor_id: metadata.id.clone(),
-                total_task_slots: executor_spec.task_slots,
-                available_task_slots: executor_spec.task_slots,
+                total_task_slots: metadata.specification.task_slots,
+                available_task_slots: metadata.specification.task_slots,
             };
-            self.state
-                .save_executor_data(executor_data)
-                .await
-                .map_err(|e| {
-                    let msg = format!("Could not save executor data: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
-            lock.unlock().await;
+            self.state.save_executor_data(executor_data);
             Ok(Response::new(RegisterExecutorResult { success: true }))
         } else {
             warn!("Received invalid register executor request");
@@ -606,120 +589,77 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         }
     }
 
-    async fn send_heart_beat(
+    async fn heart_beat_from_executor(
         &self,
-        request: Request<SendHeartBeatParams>,
-    ) -> Result<Response<SendHeartBeatResult>, Status> {
-        let remote_addr = request.remote_addr();
-        if let SendHeartBeatParams {
-            metadata: Some(metadata),
-            state: Some(state),
-        } = request.into_inner()
-        {
-            debug!("Received heart beat request for {:?}", metadata);
-            trace!("Related executor state is {:?}", state);
-            let metadata: ExecutorMeta = ExecutorMeta {
-                id: metadata.id,
-                host: metadata
-                    .optional_host
-                    .map(|h| match h {
-                        OptionalHost::Host(host) => host,
-                    })
-                    .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
-                port: metadata.port as u16,
-                grpc_port: metadata.grpc_port as u16,
-            };
-            {
-                let mut lock = self.state.lock().await.map_err(|e| {
-                    let msg = format!("Could not lock the state: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
-                self.state
-                    .save_executor_state(metadata, Some(state))
-                    .await
-                    .map_err(|e| {
-                        let msg = format!("Could not save executor metadata: {}", e);
-                        error!("{}", msg);
-                        tonic::Status::internal(msg)
-                    })?;
-                lock.unlock().await;
-            }
-            Ok(Response::new(SendHeartBeatResult { reregister: false }))
-        } else {
-            warn!("Received invalid executor heart beat request");
-            Err(tonic::Status::invalid_argument(
-                "Missing metadata or metrics in request",
-            ))
-        }
+        request: Request<HeartBeatParams>,
+    ) -> Result<Response<HeartBeatResult>, Status> {
+        let HeartBeatParams { executor_id, state } = request.into_inner();
+
+        debug!("Received heart beat request for {:?}", executor_id);
+        trace!("Related executor state is {:?}", state);
+        let executor_heartbeat = ExecutorHeartbeat {
+            executor_id,
+            timestamp: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .expect("Time went backwards")
+                .as_secs(),
+            state,
+        };
+        self.state.save_executor_heartbeat(executor_heartbeat);
+        Ok(Response::new(HeartBeatResult { reregister: false }))
     }
 
     async fn update_task_status(
         &self,
         request: Request<UpdateTaskStatusParams>,
     ) -> Result<Response<UpdateTaskStatusResult>, Status> {
-        if let UpdateTaskStatusParams {
-            metadata: Some(metadata),
+        let UpdateTaskStatusParams {
+            executor_id,
             task_status,
-        } = request.into_inner()
+        } = request.into_inner();
+
+        debug!(
+            "Received task status update request for executor {:?}",
+            executor_id
+        );
+        trace!("Related task status is {:?}", task_status);
+        let mut jobs = HashSet::new();
         {
-            debug!("Received task status update request for {:?}", metadata);
-            trace!("Related task status is {:?}", task_status);
-            let mut jobs = HashSet::new();
-            {
-                let mut lock = self.state.lock().await.map_err(|e| {
-                    let msg = format!("Could not lock the state: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
-                let num_tasks = task_status.len();
-                for task_status in task_status {
-                    self.state
-                        .save_task_status(&task_status)
-                        .await
-                        .map_err(|e| {
-                            let msg = format!("Could not save task status: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
-                    if task_status.partition_id.is_some() {
-                        jobs.insert(task_status.partition_id.unwrap().job_id.clone());
-                    }
-                }
-                let mut executor_data = self
-                    .state
-                    .get_executor_data(&metadata.id)
-                    .await
-                    .map_err(|e| {
-                        let msg = format!(
-                            "Could not get metadata data for id {:?}: {}",
-                            &metadata.id, e
-                        );
-                        error!("{}", msg);
-                        tonic::Status::internal(msg)
-                    })?;
-                executor_data.available_task_slots += num_tasks as u32;
+            let num_tasks = task_status.len();
+            for task_status in task_status {
                 self.state
-                    .save_executor_data(executor_data)
+                    .save_task_status(&task_status)
                     .await
                     .map_err(|e| {
-                        let msg = format!("Could not save metadata data: {}", e);
+                        let msg = format!("Could not save task status: {}", e);
                         error!("{}", msg);
                         tonic::Status::internal(msg)
                     })?;
-                lock.unlock().await;
+                if let Some(task_id) = task_status.task_id {
+                    jobs.insert(task_id.job_id.clone());
+                }
             }
-            let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
+            if let Some(mut executor_data) = self.state.get_executor_data(&executor_id) {
+                executor_data.available_task_slots += num_tasks as u32;
+                self.state.save_executor_data(executor_data);
+            } else {
+                error!("Fail to get executor data for {:?}", &executor_id);
+            }
+        }
+        if let Some(scheduler_env) = self.scheduler_env.as_ref() {
+            let tx_job = scheduler_env.tx_job.clone();
             for job_id in jobs {
-                tx_job.send(job_id).await.unwrap();
+                tx_job.send(job_id.clone()).await.map_err(|e| {
+                    let msg = format!(
+                        "Could not send job {} to the channel due to {:?}",
+                        &job_id, e
+                    );
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
             }
-            Ok(Response::new(UpdateTaskStatusResult { success: true }))
-        } else {
-            warn!("Received invalid task status update request");
-            Err(tonic::Status::invalid_argument(
-                "Missing metadata or task status in request",
-            ))
         }
+        Ok(Response::new(UpdateTaskStatusResult { success: true }))
     }
 
     async fn get_file_metadata(
@@ -932,7 +872,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                         shuffle_writer.output_partitioning().partition_count();
                     for partition_id in 0..num_partitions {
                         let pending_status = TaskStatus {
-                            partition_id: Some(PartitionId {
+                            task_id: Some(PartitionId {
                                 job_id: job_id_spawn.clone(),
                                 stage_id: shuffle_writer.stage_id() as u32,
                                 partition_id: partition_id as u32,
@@ -967,11 +907,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
     ) -> std::result::Result<Response<GetJobStatusResult>, tonic::Status> {
         let job_id = request.into_inner().job_id;
         debug!("Received get_job_status request for job {}", job_id);
-        let job_meta = self.state.get_job_metadata(&job_id).await.map_err(|e| {
-            let msg = format!("Error reading job metadata: {}", e);
-            error!("{}", msg);
-            tonic::Status::internal(msg)
-        })?;
+        let job_meta = self.state.get_job_metadata(&job_id).unwrap();
         Ok(Response::new(GetJobStatusResult {
             status: Some(job_meta),
         }))
@@ -997,6 +933,7 @@ mod test {
         executor_registration::OptionalHost, ExecutorRegistration, LogicalPlanNode,
         PhysicalPlanNode, PollWorkParams,
     };
+    use ballista_core::serde::scheduler::ExecutorSpecification;
     use ballista_core::serde::BallistaCodec;
     use datafusion::prelude::ExecutionContext;
 
@@ -1007,22 +944,21 @@ mod test {
 
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
-        let state = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let namespace = "default";
         let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
-                state.clone(),
+                state_storage.clone(),
                 namespace.to_owned(),
                 Arc::new(RwLock::new(ExecutionContext::new())),
                 BallistaCodec::default(),
             );
-        let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
-            SchedulerState::new(state, namespace.to_string(), BallistaCodec::default());
         let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
             optional_host: Some(OptionalHost::Host("".to_owned())),
             port: 0,
             grpc_port: 0,
+            specification: Some(ExecutorSpecification { task_slots: 2 }.into()),
         };
         let request: Request<PollWorkParams> = Request::new(PollWorkParams {
             metadata: Some(exec_meta.clone()),
@@ -1036,6 +972,14 @@ mod test {
             .into_inner();
         // no response task since we told the scheduler we didn't want to accept one
         assert!(response.task.is_none());
+        let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
+            SchedulerState::new(
+                state_storage.clone(),
+                namespace.to_string(),
+                BallistaCodec::default(),
+            );
+        let ctx = scheduler.ctx.read().await;
+        state.init(&ctx).await?;
         // executor should be registered
         assert_eq!(state.get_executors_metadata().await.unwrap().len(), 1);
 
@@ -1051,6 +995,14 @@ mod test {
             .into_inner();
         // still no response task since there are no tasks in the scheduelr
         assert!(response.task.is_none());
+        let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
+            SchedulerState::new(
+                state_storage.clone(),
+                namespace.to_string(),
+                BallistaCodec::default(),
+            );
+        let ctx = scheduler.ctx.read().await;
+        state.init(&ctx).await?;
         // executor should be registered
         assert_eq!(state.get_executors_metadata().await.unwrap().len(), 1);
         Ok(())
diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs
index 2d7250e..790df0d 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use ballista_core::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use ballista_core::serde::BallistaCodec;
 use ballista_core::{
-    error::Result,
-    serde::protobuf::{
-        scheduler_grpc_server::SchedulerGrpcServer, LogicalPlanNode, PhysicalPlanNode,
-    },
+    error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
     BALLISTA_VERSION,
 };
 use datafusion::prelude::ExecutionContext;
@@ -35,14 +33,15 @@ use crate::{state::StandaloneClient, SchedulerServer};
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
     let client = StandaloneClient::try_new_temporary()?;
 
-    let server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> = SchedulerServer::new(
-        Arc::new(client),
-        "ballista".to_string(),
-        Arc::new(RwLock::new(ExecutionContext::new())),
-        BallistaCodec::default(),
-    );
-
-    let server = SchedulerGrpcServer::new(server);
+    let scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+        SchedulerServer::new(
+            Arc::new(client),
+            "ballista".to_string(),
+            Arc::new(RwLock::new(ExecutionContext::new())),
+            BallistaCodec::default(),
+        );
+    scheduler_server.init().await?;
+    let server = SchedulerGrpcServer::new(scheduler_server);
     // Let the OS assign a random, free port
     let listener = TcpListener::bind("localhost:0").await?;
     let addr = listener.local_addr()?;
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index 974516d..d118e9e 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -15,24 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashSet;
 use std::time::{SystemTime, UNIX_EPOCH};
 use std::{any::type_name, collections::HashMap, sync::Arc, time::Duration};
 
+use parking_lot::RwLock;
+
 use datafusion::physical_plan::ExecutionPlan;
-use futures::{Stream, StreamExt};
-use log::{debug, error, info};
+use futures::Stream;
+use log::{debug, error, info, warn};
 use prost::Message;
-use tokio::sync::OwnedMutexGuard;
+use tokio::sync::{mpsc, OwnedMutexGuard};
 
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::execution_plans::UnresolvedShuffleExec;
 use ballista_core::serde::protobuf::{
     self, job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat,
-    ExecutorMetadata, FailedJob, FailedTask, JobStatus, RunningJob, RunningTask,
-    TaskStatus,
+    FailedJob, FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus,
+};
+use ballista_core::serde::scheduler::{
+    ExecutorData, ExecutorMetadata, PartitionId, PartitionStats,
 };
-use ballista_core::serde::scheduler::{ExecutorData, PartitionStats};
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
-use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta};
-use ballista_core::{error::Result, execution_plans::UnresolvedShuffleExec};
 use datafusion::prelude::ExecutionContext;
 
 use super::planner::remove_unresolved_shuffles;
@@ -82,16 +86,179 @@ pub enum WatchEvent {
     Delete(String),
 }
 
+type JobTasks = HashMap<u32, HashMap<u32, TaskStatus>>;
+
 #[derive(Clone)]
-pub(super) struct SchedulerState<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
-{
+struct VolatileSchedulerState {
+    executors_heartbeat: Arc<RwLock<HashMap<String, ExecutorHeartbeat>>>,
+    executors_data: Arc<RwLock<HashMap<String, ExecutorData>>>,
+
+    // job -> stage -> partition
+    tasks: Arc<RwLock<HashMap<String, JobTasks>>>,
+}
+
+/// For in-memory state, we don't use async to provide related services
+impl VolatileSchedulerState {
+    fn new() -> Self {
+        Self {
+            executors_heartbeat: Arc::new(RwLock::new(HashMap::new())),
+            executors_data: Arc::new(RwLock::new(HashMap::new())),
+            tasks: Arc::new(RwLock::new(HashMap::new())),
+        }
+    }
+
+    fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) {
+        let mut executors_heartbeat = self.executors_heartbeat.write();
+        executors_heartbeat.insert(heartbeat.executor_id.clone(), heartbeat);
+    }
+
+    fn get_executors_heartbeat(&self) -> Vec<ExecutorHeartbeat> {
+        let executors_heartbeat = self.executors_heartbeat.read();
+        executors_heartbeat
+            .iter()
+            .map(|(_exec, heartbeat)| heartbeat.clone())
+            .collect()
+    }
+
+    /// last_seen_ts_threshold is in seconds
+    fn get_alive_executors(&self, last_seen_ts_threshold: u64) -> HashSet<String> {
+        let executors_heartbeat = self.executors_heartbeat.read();
+        executors_heartbeat
+            .iter()
+            .filter_map(|(exec, heartbeat)| {
+                (heartbeat.timestamp > last_seen_ts_threshold).then(|| exec.clone())
+            })
+            .collect()
+    }
+
+    fn get_alive_executors_within_one_minute(&self) -> HashSet<String> {
+        let now_epoch_ts = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards");
+        let last_seen_threshold = now_epoch_ts
+            .checked_sub(Duration::from_secs(60))
+            .unwrap_or_else(|| Duration::from_secs(0));
+        self.get_alive_executors(last_seen_threshold.as_secs())
+    }
+
+    fn save_executor_data(&self, executor_data: ExecutorData) {
+        let mut executors_data = self.executors_data.write();
+        executors_data.insert(executor_data.executor_id.clone(), executor_data);
+    }
+
+    fn get_executor_data(&self, executor_id: &str) -> Option<ExecutorData> {
+        let executors_data = self.executors_data.read();
+        executors_data.get(executor_id).cloned()
+    }
+
+    /// There are two checks:
+    /// 1. firstly alive
+    /// 2. secondly available task slots > 0
+    fn get_available_executors_data(&self) -> Vec<ExecutorData> {
+        let mut res = {
+            let alive_executors = self.get_alive_executors_within_one_minute();
+            let executors_data = self.executors_data.read();
+            executors_data
+                .iter()
+                .filter_map(|(exec, data)| {
+                    alive_executors.contains(exec).then(|| data.clone())
+                })
+                .collect::<Vec<ExecutorData>>()
+        };
+        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
+        res
+    }
+
+    fn save_task_status(&self, status: &TaskStatus) {
+        let task_id = status.task_id.as_ref().unwrap();
+        let mut tasks = self.tasks.write();
+        let job_tasks = tasks
+            .entry(task_id.job_id.clone())
+            .or_insert_with(HashMap::new);
+        let stage_tasks = job_tasks
+            .entry(task_id.stage_id)
+            .or_insert_with(HashMap::new);
+        stage_tasks.insert(task_id.partition_id, status.clone());
+    }
+
+    fn _get_task(
+        &self,
+        job_id: &str,
+        stage_id: usize,
+        partition_id: usize,
+    ) -> Option<TaskStatus> {
+        let tasks = self.tasks.read();
+        let job_tasks = tasks.get(job_id);
+        if let Some(job_tasks) = job_tasks {
+            let stage_id = stage_id as u32;
+            let stage_tasks = job_tasks.get(&stage_id);
+            if let Some(stage_tasks) = stage_tasks {
+                let partition_id = partition_id as u32;
+                stage_tasks.get(&partition_id).cloned()
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+
+    fn get_job_tasks(&self, job_id: &str) -> Option<Vec<TaskStatus>> {
+        let tasks = self.tasks.read();
+        let job_tasks = tasks.get(job_id);
+
+        if let Some(job_tasks) = job_tasks {
+            let mut res = vec![];
+            VolatileSchedulerState::fill_job_tasks(&mut res, job_tasks);
+            Some(res)
+        } else {
+            None
+        }
+    }
+
+    fn get_tasks(&self) -> Vec<TaskStatus> {
+        let mut res = vec![];
+
+        let tasks = self.tasks.read();
+        for (_job_id, job_tasks) in tasks.iter() {
+            VolatileSchedulerState::fill_job_tasks(&mut res, job_tasks);
+        }
+
+        res
+    }
+
+    fn fill_job_tasks(
+        res: &mut Vec<TaskStatus>,
+        job_tasks: &HashMap<u32, HashMap<u32, TaskStatus>>,
+    ) {
+        for stage_tasks in job_tasks.values() {
+            for task_status in stage_tasks.values() {
+                res.push(task_status.clone());
+            }
+        }
+    }
+}
+
+type StageKey = (String, u32);
+
+#[derive(Clone)]
+struct StableSchedulerState<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
+    // for db
     config_client: Arc<dyn ConfigBackendClient>,
     namespace: String,
     codec: BallistaCodec<T, U>,
+
+    // for in-memory cache
+    executors_metadata: Arc<RwLock<HashMap<String, ExecutorMetadata>>>,
+
+    jobs: Arc<RwLock<HashMap<String, JobStatus>>>,
+    stages: Arc<RwLock<HashMap<StageKey, Arc<dyn ExecutionPlan>>>>,
 }
 
-impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, U> {
-    pub fn new(
+impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
+    StableSchedulerState<T, U>
+{
+    fn new(
         config_client: Arc<dyn ConfigBackendClient>,
         namespace: String,
         codec: BallistaCodec<T, U>,
@@ -100,119 +267,424 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             config_client,
             namespace,
             codec,
+            executors_metadata: Arc::new(RwLock::new(HashMap::new())),
+            jobs: Arc::new(RwLock::new(HashMap::new())),
+            stages: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 
-    pub async fn get_executors_metadata(&self) -> Result<Vec<(ExecutorMeta, Duration)>> {
-        let mut result = vec![];
+    /// Load the state stored in storage into memory
+    async fn init(&self, ctx: &ExecutionContext) -> Result<()> {
+        self.init_executors_metadata_from_storage().await?;
+        self.init_jobs_from_storage().await?;
+        self.init_stages_from_storage(ctx).await?;
+
+        Ok(())
+    }
+
+    async fn init_executors_metadata_from_storage(&self) -> Result<()> {
+        let entries = self
+            .config_client
+            .get_from_prefix(&get_executors_metadata_prefix(&self.namespace))
+            .await?;
+
+        let mut executors_metadata = self.executors_metadata.write();
+        for (_key, entry) in entries {
+            let meta: protobuf::ExecutorMetadata = decode_protobuf(&entry)?;
+            executors_metadata.insert(meta.id.clone(), meta.into());
+        }
+
+        Ok(())
+    }
 
+    async fn init_jobs_from_storage(&self) -> Result<()> {
         let entries = self
             .config_client
-            .get_from_prefix(&get_executors_prefix(&self.namespace))
+            .get_from_prefix(&get_job_prefix(&self.namespace))
             .await?;
+
+        let mut jobs = self.jobs.write();
+        for (key, entry) in entries {
+            let job: JobStatus = decode_protobuf(&entry)?;
+            let job_id = extract_job_id_from_job_key(&key)
+                .map(|job_id| job_id.to_string())
+                .unwrap();
+            jobs.insert(job_id, job);
+        }
+
+        Ok(())
+    }
+
+    async fn init_stages_from_storage(&self, ctx: &ExecutionContext) -> Result<()> {
+        let entries = self
+            .config_client
+            .get_from_prefix(&get_stage_prefix(&self.namespace))
+            .await?;
+
+        let mut stages = self.stages.write();
+        for (key, entry) in entries {
+            let (job_id, stage_id) = extract_stage_id_from_stage_key(&key).unwrap();
+            let value = U::try_decode(&entry)?;
+            let plan = value
+                .try_into_physical_plan(ctx, self.codec.physical_extension_codec())?;
+
+            stages.insert((job_id, stage_id), plan);
+        }
+
+        Ok(())
+    }
+
+    pub async fn save_executor_metadata(
+        &self,
+        executor_meta: ExecutorMetadata,
+    ) -> Result<()> {
+        {
+            // Save in db
+            let key = get_executor_metadata_key(&self.namespace, &executor_meta.id);
+            let value = {
+                let executor_meta: protobuf::ExecutorMetadata =
+                    executor_meta.clone().into();
+                encode_protobuf(&executor_meta)?
+            };
+            self.synchronize_save(key, value).await?;
+        }
+
+        {
+            // Save in memory
+            let mut executors_metadata = self.executors_metadata.write();
+            executors_metadata.insert(executor_meta.id.clone(), executor_meta);
+        }
+
+        Ok(())
+    }
+
+    fn get_executor_metadata(&self, executor_id: &str) -> Option<ExecutorMetadata> {
+        let executors_metadata = self.executors_metadata.read();
+        executors_metadata.get(executor_id).cloned()
+    }
+
+    fn get_executors_metadata(&self) -> Vec<ExecutorMetadata> {
+        let executors_metadata = self.executors_metadata.read();
+        executors_metadata.values().cloned().collect()
+    }
+
+    async fn save_job_metadata(&self, job_id: &str, status: &JobStatus) -> Result<()> {
+        debug!("Saving job metadata: {:?}", status);
+        {
+            // Save in db
+            let key = get_job_key(&self.namespace, job_id);
+            let value = encode_protobuf(status)?;
+            self.synchronize_save(key, value).await?;
+        }
+
+        {
+            // Save in memory
+            let mut jobs = self.jobs.write();
+            jobs.insert(job_id.to_string(), status.clone());
+        }
+
+        Ok(())
+    }
+
+    fn get_job_metadata(&self, job_id: &str) -> Option<JobStatus> {
+        let jobs = self.jobs.read();
+        jobs.get(job_id).cloned()
+    }
+
+    async fn save_stage_plan(
+        &self,
+        job_id: &str,
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<()> {
+        {
+            // Save in db
+            let key = get_stage_plan_key(&self.namespace, job_id, stage_id as u32);
+            let value = {
+                let mut buf: Vec<u8> = vec![];
+                let proto = U::try_from_physical_plan(
+                    plan.clone(),
+                    self.codec.physical_extension_codec(),
+                )?;
+                proto.try_encode(&mut buf)?;
+
+                buf
+            };
+            self.synchronize_save(key, value).await?;
+        }
+
+        {
+            // Save in memory
+            let mut stages = self.stages.write();
+            stages.insert((job_id.to_string(), stage_id as u32), plan);
+        }
+
+        Ok(())
+    }
+
+    fn get_stage_plan(
+        &self,
+        job_id: &str,
+        stage_id: usize,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        let stages = self.stages.read();
+        let key = (job_id.to_string(), stage_id as u32);
+        stages.get(&key).cloned()
+    }
+
+    async fn synchronize_save(&self, key: String, value: Vec<u8>) -> Result<()> {
+        let mut lock = self.config_client.lock().await?;
+        self.config_client.put(key, value).await?;
+        lock.unlock().await;
+
+        Ok(())
+    }
+}
+
+fn get_executors_metadata_prefix(namespace: &str) -> String {
+    format!("/ballista/{}/executor_metadata", namespace)
+}
+
+fn get_executor_metadata_key(namespace: &str, id: &str) -> String {
+    format!("{}/{}", get_executors_metadata_prefix(namespace), id)
+}
+
+fn get_job_prefix(namespace: &str) -> String {
+    format!("/ballista/{}/jobs", namespace)
+}
+
+fn get_job_key(namespace: &str, id: &str) -> String {
+    format!("{}/{}", get_job_prefix(namespace), id)
+}
+
+fn get_stage_prefix(namespace: &str) -> String {
+    format!("/ballista/{}/stages", namespace,)
+}
+
+fn get_stage_plan_key(namespace: &str, job_id: &str, stage_id: u32) -> String {
+    format!("{}/{}/{}", get_stage_prefix(namespace), job_id, stage_id,)
+}
+
+fn extract_job_id_from_job_key(job_key: &str) -> Result<&str> {
+    job_key.split('/').nth(2).ok_or_else(|| {
+        BallistaError::Internal(format!("Unexpected task key: {}", job_key))
+    })
+}
+
+fn extract_stage_id_from_stage_key(stage_key: &str) -> Result<StageKey> {
+    let splits: Vec<&str> = stage_key.split('/').collect();
+    if splits.len() < 4 {
+        Err(BallistaError::Internal(format!(
+            "Unexpected stage key: {}",
+            stage_key
+        )))
+    } else {
+        Ok((
+            splits.get(2).unwrap().to_string(),
+            splits.get(3).unwrap().parse::<u32>().unwrap(),
+        ))
+    }
+}
+
+fn decode_protobuf<T: Message + Default>(bytes: &[u8]) -> Result<T> {
+    T::decode(bytes).map_err(|e| {
+        BallistaError::Internal(format!(
+            "Could not deserialize {}: {}",
+            type_name::<T>(),
+            e
+        ))
+    })
+}
+
+fn encode_protobuf<T: Message + Default>(msg: &T) -> Result<Vec<u8>> {
+    let mut value: Vec<u8> = Vec::with_capacity(msg.encoded_len());
+    msg.encode(&mut value).map_err(|e| {
+        BallistaError::Internal(format!(
+            "Could not serialize {}: {}",
+            type_name::<T>(),
+            e
+        ))
+    })?;
+    Ok(value)
+}
+
+#[derive(Clone)]
+struct SchedulerStateWatcher {
+    tx_task: mpsc::Sender<TaskStatus>,
+}
+
+impl SchedulerStateWatcher {
+    async fn watch(&self, task_status: TaskStatus) -> Result<()> {
+        self.tx_task.send(task_status).await.map_err(|e| {
+            BallistaError::Internal(format!(
+                "Fail to send task status event to channel due to {:?}",
+                e
+            ))
+        })?;
+
+        Ok(())
+    }
+
+    fn synchronize_job_status_loop<
+        T: 'static + AsLogicalPlan,
+        U: 'static + AsExecutionPlan,
+    >(
+        &self,
+        scheduler_state: SchedulerState<T, U>,
+        mut rx_task: mpsc::Receiver<TaskStatus>,
+    ) {
+        tokio::spawn(async move {
+            info!("Starting the scheduler state watcher");
+            loop {
+                if let Some(task_status) = rx_task.recv().await {
+                    debug!("Watch on task status {:?}", task_status);
+                    if let Some(task_id) = task_status.task_id {
+                        scheduler_state
+                            .synchronize_job_status(&task_id.job_id)
+                            .await
+                            .unwrap_or_else(|e| {
+                                error!(
+                                    "Fail to synchronize the status for job {:?} due to {:?}",
+                                    task_id.job_id, e
+                                );
+                            });
+                    } else {
+                        warn!(
+                            "There's no PartitionId in the task status {:?}",
+                            task_status
+                        );
+                    }
+                } else {
+                    info!("Channel is closed and will exit the loop");
+                    return;
+                };
+            }
+        });
+    }
+}
+
+#[derive(Clone)]
+pub(super) struct SchedulerState<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
+{
+    stable_state: StableSchedulerState<T, U>,
+    volatile_state: VolatileSchedulerState,
+    listener: SchedulerStateWatcher,
+}
+
+impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, U> {
+    pub fn new(
+        config_client: Arc<dyn ConfigBackendClient>,
+        namespace: String,
+        codec: BallistaCodec<T, U>,
+    ) -> Self {
+        // TODO Make the buffer size configurable
+        let (tx_task, rx_task) = mpsc::channel::<TaskStatus>(1000);
+        let ret = Self {
+            stable_state: StableSchedulerState::new(config_client, namespace, codec),
+            volatile_state: VolatileSchedulerState::new(),
+            listener: SchedulerStateWatcher { tx_task },
+        };
+        ret.listener
+            .synchronize_job_status_loop(ret.clone(), rx_task);
+
+        ret
+    }
+
+    pub async fn init(&self, ctx: &ExecutionContext) -> Result<()> {
+        self.stable_state.init(ctx).await?;
+
+        Ok(())
+    }
+
+    pub async fn get_executors_metadata(
+        &self,
+    ) -> Result<Vec<(ExecutorMetadata, Duration)>> {
+        let mut result = vec![];
+
+        let executors_heartbeat = self
+            .volatile_state
+            .get_executors_heartbeat()
+            .into_iter()
+            .map(|heartbeat| (heartbeat.executor_id.clone(), heartbeat))
+            .collect::<HashMap<String, ExecutorHeartbeat>>();
+
+        let executors_metadata = self.stable_state.get_executors_metadata();
+
         let now_epoch_ts = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .expect("Time went backwards");
-        for (_key, entry) in entries {
-            let heartbeat: ExecutorHeartbeat = decode_protobuf(&entry)?;
-            let meta = heartbeat.meta.unwrap();
-            let ts = Duration::from_secs(heartbeat.timestamp);
+
+        for meta in executors_metadata.into_iter() {
+            // If there's no heartbeat info for an executor, regard its heartbeat timestamp as 0
+            // so that it will always be excluded when requesting alive executors
+            let ts = executors_heartbeat
+                .get(&meta.id)
+                .map(|heartbeat| Duration::from_secs(heartbeat.timestamp))
+                .unwrap_or_else(|| Duration::from_secs(0));
             let time_since_last_seen = now_epoch_ts
                 .checked_sub(ts)
                 .unwrap_or_else(|| Duration::from_secs(0));
-            result.push((meta.into(), time_since_last_seen));
+            result.push((meta, time_since_last_seen));
         }
         Ok(result)
     }
 
-    pub async fn get_alive_executors_metadata_within_one_minute(
-        &self,
-    ) -> Result<Vec<ExecutorMeta>> {
-        self.get_alive_executors_metadata(Duration::from_secs(60))
-            .await
-    }
-
     pub async fn get_alive_executors_metadata(
         &self,
         last_seen_threshold: Duration,
-    ) -> Result<Vec<ExecutorMeta>> {
-        Ok(self
-            .get_executors_metadata()
-            .await?
-            .into_iter()
-            .filter_map(|(exec, last_seen)| {
-                (last_seen < last_seen_threshold).then(|| exec)
-            })
-            .collect())
+    ) -> Result<Vec<ExecutorMetadata>> {
+        let mut result = vec![];
+
+        let now_epoch_ts = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards");
+        let last_seen_ts_threshold = now_epoch_ts
+            .checked_sub(last_seen_threshold)
+            .unwrap_or_else(|| Duration::from_secs(0));
+        let alive_executors = self
+            .volatile_state
+            .get_alive_executors(last_seen_ts_threshold.as_secs());
+        for executor_id in alive_executors {
+            let meta = self.get_executor_metadata(&executor_id);
+            if meta.is_none() {
+                return Err(BallistaError::General(format!(
+                    "No executor metadata found for {}",
+                    executor_id
+                )));
+            }
+            result.push(meta.unwrap());
+        }
+
+        Ok(result)
     }
 
-    pub async fn save_executor_metadata(&self, meta: ExecutorMeta) -> Result<()> {
-        self.save_executor_state(meta, None).await
+    pub fn get_executor_metadata(&self, executor_id: &str) -> Option<ExecutorMetadata> {
+        self.stable_state.get_executor_metadata(executor_id)
     }
 
-    pub async fn save_executor_state(
+    pub async fn save_executor_metadata(
         &self,
-        meta: ExecutorMeta,
-        state: Option<protobuf::ExecutorState>,
+        executor_meta: ExecutorMetadata,
     ) -> Result<()> {
-        let key = get_executor_key(&self.namespace, &meta.id);
-        let meta: ExecutorMetadata = meta.into();
-        let timestamp = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .expect("Time went backwards")
-            .as_secs();
-        let heartbeat = ExecutorHeartbeat {
-            meta: Some(meta),
-            timestamp,
-            state,
-        };
-        let value: Vec<u8> = encode_protobuf(&heartbeat)?;
-        self.config_client.put(key, value).await
+        self.stable_state
+            .save_executor_metadata(executor_meta)
+            .await
     }
 
-    pub async fn save_executor_data(&self, executor_data: ExecutorData) -> Result<()> {
-        let key = get_executor_data_key(&self.namespace, &executor_data.executor_id);
-        let executor_data: protobuf::ExecutorData = executor_data.into();
-        let value: Vec<u8> = encode_protobuf(&executor_data)?;
-        self.config_client.put(key, value).await
+    pub fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) {
+        self.volatile_state.save_executor_heartbeat(heartbeat);
     }
 
-    pub async fn get_executors_data(&self) -> Result<Vec<ExecutorData>> {
-        let mut result = vec![];
-
-        let entries = self
-            .config_client
-            .get_from_prefix(&get_executors_data_prefix(&self.namespace))
-            .await?;
-        for (_key, entry) in entries {
-            let executor_data: protobuf::ExecutorData = decode_protobuf(&entry)?;
-            result.push(executor_data.into());
-        }
-        Ok(result)
+    pub fn save_executor_data(&self, executor_data: ExecutorData) {
+        self.volatile_state.save_executor_data(executor_data);
     }
 
-    pub async fn get_available_executors_data(&self) -> Result<Vec<ExecutorData>> {
-        let mut res = self
-            .get_executors_data()
-            .await?
-            .into_iter()
-            .filter_map(|exec| (exec.available_task_slots > 0).then(|| exec))
-            .collect::<Vec<ExecutorData>>();
-        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
-        Ok(res)
+    pub fn get_available_executors_data(&self) -> Vec<ExecutorData> {
+        self.volatile_state.get_available_executors_data()
     }
 
-    pub async fn get_executor_data(&self, executor_id: &str) -> Result<ExecutorData> {
-        let key = get_executor_data_key(&self.namespace, executor_id);
-        let value = &self.config_client.get(&key).await?;
-        if value.is_empty() {
-            return Err(BallistaError::General(format!(
-                "No executor data found for {}",
-                key
-            )));
-        }
-        let value: protobuf::ExecutorData = decode_protobuf(value)?;
-        Ok(value.into())
+    pub fn get_executor_data(&self, executor_id: &str) -> Option<ExecutorData> {
+        self.volatile_state.get_executor_data(executor_id)
     }
 
     pub async fn save_job_metadata(
@@ -220,114 +692,55 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         job_id: &str,
         status: &JobStatus,
     ) -> Result<()> {
-        debug!("Saving job metadata: {:?}", status);
-        let key = get_job_key(&self.namespace, job_id);
-        let value = encode_protobuf(status)?;
-        self.config_client.put(key, value).await
-    }
-
-    pub async fn get_job_metadata(&self, job_id: &str) -> Result<JobStatus> {
-        let key = get_job_key(&self.namespace, job_id);
-        let value = &self.config_client.get(&key).await?;
-        if value.is_empty() {
-            return Err(BallistaError::General(format!(
-                "No job metadata found for {}",
-                key
-            )));
-        }
-        let value: JobStatus = decode_protobuf(value)?;
-        Ok(value)
+        self.stable_state.save_job_metadata(job_id, status).await
     }
 
-    pub async fn save_task_status(&self, status: &TaskStatus) -> Result<()> {
-        let partition_id = status.partition_id.as_ref().unwrap();
-        let key = get_task_status_key(
-            &self.namespace,
-            &partition_id.job_id,
-            partition_id.stage_id as usize,
-            partition_id.partition_id as usize,
-        );
-        let value = encode_protobuf(status)?;
-        self.config_client.put(key, value).await
-    }
-
-    pub async fn _get_task_status(
-        &self,
-        job_id: &str,
-        stage_id: usize,
-        partition_id: usize,
-    ) -> Result<TaskStatus> {
-        let key = get_task_status_key(&self.namespace, job_id, stage_id, partition_id);
-        let value = &self.config_client.clone().get(&key).await?;
-        if value.is_empty() {
-            return Err(BallistaError::General(format!(
-                "No task status found for {}",
-                key
-            )));
-        }
-        let value: TaskStatus = decode_protobuf(value)?;
-        Ok(value)
+    pub fn get_job_metadata(&self, job_id: &str) -> Option<JobStatus> {
+        self.stable_state.get_job_metadata(job_id)
     }
 
-    // "Unnecessary" lifetime syntax due to https://github.com/rust-lang/rust/issues/63033
-    pub async fn save_stage_plan<'a>(
-        &'a self,
-        job_id: &'a str,
+    pub async fn save_stage_plan(
+        &self,
+        job_id: &str,
         stage_id: usize,
         plan: Arc<dyn ExecutionPlan>,
     ) -> Result<()> {
-        let key = get_stage_plan_key(&self.namespace, job_id, stage_id);
-        let value = {
-            let mut buf: Vec<u8> = vec![];
-            let proto =
-                U::try_from_physical_plan(plan, self.codec.physical_extension_codec())?;
-            proto.try_encode(&mut buf)?;
-
-            buf
-        };
-        self.config_client.clone().put(key, value).await
+        self.stable_state
+            .save_stage_plan(job_id, stage_id, plan)
+            .await
     }
 
-    pub async fn get_stage_plan(
+    pub fn get_stage_plan(
         &self,
         job_id: &str,
         stage_id: usize,
-        ctx: &ExecutionContext,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let key = get_stage_plan_key(&self.namespace, job_id, stage_id);
-        let value = &self.config_client.get(&key).await?;
-        if value.is_empty() {
-            return Err(BallistaError::General(format!(
-                "No stage plan found for {}",
-                key
-            )));
-        }
-        let value = U::try_decode(value.as_slice())?;
-        let plan =
-            value.try_into_physical_plan(ctx, self.codec.physical_extension_codec())?;
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.stable_state.get_stage_plan(job_id, stage_id)
+    }
+
+    pub async fn save_task_status(&self, status: &TaskStatus) -> Result<()> {
+        self.volatile_state.save_task_status(status);
+        self.listener.watch(status.clone()).await?;
 
-        Ok(plan)
+        Ok(())
     }
 
-    pub async fn get_job_tasks(
+    pub fn _get_task_status(
         &self,
         job_id: &str,
-    ) -> Result<HashMap<String, TaskStatus>> {
-        self.config_client
-            .get_from_prefix(&get_task_prefix_for_job(&self.namespace, job_id))
-            .await?
-            .into_iter()
-            .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
-            .collect()
+        stage_id: usize,
+        partition_id: usize,
+    ) -> Option<TaskStatus> {
+        self.volatile_state
+            ._get_task(job_id, stage_id, partition_id)
     }
 
-    pub async fn get_all_tasks(&self) -> Result<HashMap<String, TaskStatus>> {
-        self.config_client
-            .get_from_prefix(&get_task_prefix(&self.namespace))
-            .await?
-            .into_iter()
-            .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
-            .collect()
+    pub fn get_job_tasks(&self, job_id: &str) -> Option<Vec<TaskStatus>> {
+        self.volatile_state.get_job_tasks(job_id)
+    }
+
+    pub fn get_all_tasks(&self) -> Vec<TaskStatus> {
+        self.volatile_state.get_tasks()
     }
 
     /// This function ensures that the task wasn't assigned to an executor that died.
@@ -336,7 +749,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
     async fn reschedule_dead_task(
         &self,
         task_status: &TaskStatus,
-        executors: &[ExecutorMeta],
+        executors: &[ExecutorMetadata],
     ) -> Result<bool> {
         let executor_id: &str = match &task_status.status {
             Some(task_status::Status::Completed(CompletedTask {
@@ -353,7 +766,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             info!(
                 "Executor {} isn't alive. Rescheduling task {:?}",
                 executor_id,
-                task_status.partition_id.as_ref().unwrap()
+                task_status.task_id.as_ref().unwrap()
             );
             // Task was handled in an executor that isn't alive anymore, so we can't resolve it
             // We mark the task as pending again and continue
@@ -367,10 +780,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
     pub async fn assign_next_schedulable_task(
         &self,
         executor_id: &str,
-        ctx: &ExecutionContext,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
-        let tasks = self.get_all_tasks().await?;
-        self.assign_next_schedulable_task_inner(executor_id, tasks, ctx)
+        let tasks = self.get_all_tasks();
+        self.assign_next_schedulable_task_inner(executor_id, tasks)
             .await
     }
 
@@ -378,20 +790,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         &self,
         executor_id: &str,
         job_id: &str,
-        ctx: &ExecutionContext,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
-        let job_tasks = self.get_job_tasks(job_id).await?;
-        self.assign_next_schedulable_task_inner(executor_id, job_tasks, ctx)
-            .await
+        let job_tasks = self.get_job_tasks(job_id);
+        if job_tasks.is_some() {
+            self.assign_next_schedulable_task_inner(executor_id, job_tasks.unwrap())
+                .await
+        } else {
+            Ok(None)
+        }
     }
 
     async fn assign_next_schedulable_task_inner(
         &self,
         executor_id: &str,
-        tasks: HashMap<String, TaskStatus>,
-        ctx: &ExecutionContext,
+        tasks: Vec<TaskStatus>,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
-        match self.get_next_schedulable_task(tasks, ctx).await? {
+        match self.get_next_schedulable_task(tasks).await? {
             Some((status, plan)) => {
                 let mut status = status.clone();
                 status.status = Some(task_status::Status::Running(RunningTask {
@@ -406,19 +820,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
 
     async fn get_next_schedulable_task(
         &self,
-        tasks: HashMap<String, TaskStatus>,
-        ctx: &ExecutionContext,
+        tasks: Vec<TaskStatus>,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
+        let tasks = tasks
+            .into_iter()
+            .map(|task| {
+                let task_id = task.task_id.as_ref().unwrap();
+                (
+                    PartitionId::new(
+                        &task_id.job_id,
+                        task_id.stage_id as usize,
+                        task_id.partition_id as usize,
+                    ),
+                    task,
+                )
+            })
+            .collect::<HashMap<PartitionId, TaskStatus>>();
         // TODO: Make the duration a configurable parameter
         let executors = self
             .get_alive_executors_metadata(Duration::from_secs(60))
             .await?;
         'tasks: for (_key, status) in tasks.iter() {
             if status.status.is_none() {
-                let partition = status.partition_id.as_ref().unwrap();
+                let task_id = status.task_id.as_ref().unwrap();
                 let plan = self
-                    .get_stage_plan(&partition.job_id, partition.stage_id as usize, ctx)
-                    .await?;
+                    .get_stage_plan(&task_id.job_id, task_id.stage_id as usize)
+                    .unwrap();
 
                 // Let's try to resolve any unresolved shuffles we find
                 let unresolved_shuffles = find_unresolved_shuffles(&plan)?;
@@ -435,14 +862,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                     for shuffle_input_partition_id in
                         0..unresolved_shuffle.input_partition_count
                     {
-                        let referenced_task = tasks
-                            .get(&get_task_status_key(
-                                &self.namespace,
-                                &partition.job_id,
-                                unresolved_shuffle.stage_id,
-                                shuffle_input_partition_id,
-                            ))
-                            .unwrap();
+                        let partition_id = PartitionId {
+                            job_id: task_id.job_id.clone(),
+                            stage_id: unresolved_shuffle.stage_id,
+                            partition_id: shuffle_input_partition_id,
+                        };
+                        let referenced_task = tasks.get(&partition_id).unwrap();
                         let task_is_dead = self
                             .reschedule_dead_task(referenced_task, &executors)
                             .await?;
@@ -479,7 +904,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                                         ballista_core::serde::scheduler::PartitionLocation {
                                             partition_id:
                                             ballista_core::serde::scheduler::PartitionId {
-                                                job_id: partition.job_id.clone(),
+                                                job_id: task_id.job_id.clone(),
                                                 stage_id: unresolved_shuffle.stage_id,
                                                 partition_id: shuffle_write_partition
                                                     .partition_id
@@ -535,53 +960,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         Ok(None)
     }
 
-    // Global lock for the state. We should get rid of this to be able to scale.
-    pub async fn lock(&self) -> Result<Box<dyn Lock>> {
-        self.config_client.lock().await
-    }
-
-    /// This function starts a watch over the task keys. Whenever a task changes, it re-evaluates
-    /// the status for the parent job and updates it accordingly.
-    ///
-    /// The future returned by this function never returns (unless an error happens), so it is wise
-    /// to [tokio::spawn] calls to this method.
-    pub async fn synchronize_job_status_loop(&self) -> Result<()> {
-        let watch = self
-            .config_client
-            .watch(get_task_prefix(&self.namespace))
-            .await?;
-        watch.for_each(|event: WatchEvent| async {
-            let key = match event {
-                WatchEvent::Put(key, _value) => key,
-                WatchEvent::Delete(key) => key
-            };
-            let job_id = extract_job_id_from_task_key(&key).unwrap();
-            match self.lock().await {
-                Ok(mut lock) => {
-                    if let Err(e) = self.synchronize_job_status(job_id).await {
-                        error!("Could not update job status for {}. This job might be stuck forever. Error: {}", job_id, e);
-                    }
-                    lock.unlock().await;
-                },
-                Err(e) => error!("Could not lock config backend. Job {} will have an unsynchronized status and might be stuck forever. Error: {}", job_id, e)
-            }
-        }).await;
-
-        Ok(())
-    }
-
     async fn synchronize_job_status(&self, job_id: &str) -> Result<()> {
-        let value = self
-            .config_client
-            .get(&get_job_key(&self.namespace, job_id))
-            .await?;
-        let executors: HashMap<String, ExecutorMeta> = self
+        let executors: HashMap<String, ExecutorMetadata> = self
             .get_executors_metadata()
             .await?
             .into_iter()
             .map(|(meta, _)| (meta.id.to_string(), meta))
             .collect();
-        let status: JobStatus = decode_protobuf(&value)?;
+        let status: JobStatus = self.stable_state.get_job_metadata(job_id).unwrap();
         let new_status = self.get_job_status_from_tasks(job_id, &executors).await?;
         if let Some(new_status) = new_status {
             if status != new_status {
@@ -600,15 +986,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
     async fn get_job_status_from_tasks(
         &self,
         job_id: &str,
-        executors: &HashMap<String, ExecutorMeta>,
+        executors: &HashMap<String, ExecutorMetadata>,
     ) -> Result<Option<JobStatus>> {
-        let statuses = self
-            .config_client
-            .get_from_prefix(&get_task_prefix_for_job(&self.namespace, job_id))
-            .await?
-            .into_iter()
-            .map(|(_k, v)| decode_protobuf::<TaskStatus>(&v))
-            .collect::<Result<Vec<_>>>()?;
+        let statuses = self.volatile_state.get_job_tasks(job_id);
+        if statuses.is_none() {
+            return Ok(None);
+        }
+        let statuses = statuses.unwrap();
         if statuses.is_empty() {
             return Ok(None);
         }
@@ -616,12 +1000,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         // Check for job completion
         let last_stage = statuses
             .iter()
-            .map(|task| task.partition_id.as_ref().unwrap().stage_id)
+            .map(|task| task.task_id.as_ref().unwrap().stage_id)
             .max()
             .unwrap();
         let statuses: Vec<_> = statuses
             .into_iter()
-            .filter(|task| task.partition_id.as_ref().unwrap().stage_id == last_stage)
+            .filter(|task| task.task_id.as_ref().unwrap().stage_id == last_stage)
             .collect();
         let mut job_status = statuses
             .iter()
@@ -637,7 +1021,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             .map(|info| {
                 let mut partition_location = vec![];
                 for (status, executor_id, partitions) in info {
-                    let input_partition_id = status.partition_id.as_ref().unwrap(); //TODO unwrap
+                    let input_partition_id = status.task_id.as_ref().unwrap(); //TODO unwrap
                     let executor_meta =
                         executors.get(executor_id).map(|e| e.clone().into());
                     for shuffle_write_partition in partitions {
@@ -714,99 +1098,19 @@ fn find_unresolved_shuffles(
     }
 }
 
-fn get_executors_prefix(namespace: &str) -> String {
-    format!("/ballista/{}/executors", namespace)
-}
-
-fn get_executor_key(namespace: &str, id: &str) -> String {
-    format!("{}/{}", get_executors_prefix(namespace), id)
-}
-
-fn get_executors_data_prefix(namespace: &str) -> String {
-    format!("/ballista/{}/resources/executors", namespace)
-}
-
-fn get_executor_data_key(namespace: &str, id: &str) -> String {
-    format!("{}/{}", get_executors_data_prefix(namespace), id)
-}
-
-fn get_job_prefix(namespace: &str) -> String {
-    format!("/ballista/{}/jobs", namespace)
-}
-
-fn get_job_key(namespace: &str, id: &str) -> String {
-    format!("{}/{}", get_job_prefix(namespace), id)
-}
-
-fn get_task_prefix(namespace: &str) -> String {
-    format!("/ballista/{}/tasks", namespace)
-}
-
-fn get_task_prefix_for_job(namespace: &str, job_id: &str) -> String {
-    format!("{}/{}", get_task_prefix(namespace), job_id)
-}
-
-fn get_task_status_key(
-    namespace: &str,
-    job_id: &str,
-    stage_id: usize,
-    partition_id: usize,
-) -> String {
-    format!(
-        "{}/{}/{}",
-        get_task_prefix_for_job(namespace, job_id),
-        stage_id,
-        partition_id,
-    )
-}
-
-fn extract_job_id_from_task_key(job_key: &str) -> Result<&str> {
-    job_key.split('/').nth(4).ok_or_else(|| {
-        BallistaError::Internal(format!("Unexpected task key: {}", job_key))
-    })
-}
-
-fn get_stage_plan_key(namespace: &str, job_id: &str, stage_id: usize) -> String {
-    format!("/ballista/{}/stages/{}/{}", namespace, job_id, stage_id,)
-}
-
-fn decode_protobuf<T: Message + Default>(bytes: &[u8]) -> Result<T> {
-    T::decode(bytes).map_err(|e| {
-        BallistaError::Internal(format!(
-            "Could not deserialize {}: {}",
-            type_name::<T>(),
-            e
-        ))
-    })
-}
-
-fn encode_protobuf<T: Message + Default>(msg: &T) -> Result<Vec<u8>> {
-    let mut value: Vec<u8> = Vec::with_capacity(msg.encoded_len());
-    msg.encode(&mut value).map_err(|e| {
-        BallistaError::Internal(format!(
-            "Could not serialize {}: {}",
-            type_name::<T>(),
-            e
-        ))
-    })?;
-    Ok(value)
-}
-
 #[cfg(all(test, feature = "sled"))]
 mod test {
     use std::sync::Arc;
 
+    use ballista_core::error::BallistaError;
     use ballista_core::serde::protobuf::{
         job_status, task_status, CompletedTask, FailedTask, JobStatus, LogicalPlanNode,
         PartitionId, PhysicalPlanNode, QueuedJob, RunningJob, RunningTask, TaskStatus,
     };
+    use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification};
     use ballista_core::serde::BallistaCodec;
-    use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta};
 
-    use super::{
-        extract_job_id_from_task_key, get_task_status_key, SchedulerState,
-        StandaloneClient,
-    };
+    use super::{SchedulerState, StandaloneClient};
 
     #[tokio::test]
     async fn executor_metadata() -> Result<(), BallistaError> {
@@ -816,11 +1120,12 @@ mod test {
                 "test".to_string(),
                 BallistaCodec::default(),
             );
-        let meta = ExecutorMeta {
+        let meta = ExecutorMetadata {
             id: "123".to_owned(),
             host: "localhost".to_owned(),
             port: 123,
             grpc_port: 124,
+            specification: ExecutorSpecification { task_slots: 2 },
         };
         state.save_executor_metadata(meta.clone()).await?;
         let result: Vec<_> = state
@@ -845,7 +1150,7 @@ mod test {
             status: Some(job_status::Status::Queued(QueuedJob {})),
         };
         state.save_job_metadata("job", &meta).await?;
-        let result = state.get_job_metadata("job").await?;
+        let result = state.get_job_metadata("job").unwrap();
         assert!(result.status.is_some());
         match result.status.unwrap() {
             job_status::Status::Queued(_) => (),
@@ -866,8 +1171,8 @@ mod test {
             status: Some(job_status::Status::Queued(QueuedJob {})),
         };
         state.save_job_metadata("job", &meta).await?;
-        let result = state.get_job_metadata("job2").await;
-        assert!(result.is_err());
+        let result = state.get_job_metadata("job2");
+        assert!(result.is_none());
         Ok(())
     }
 
@@ -883,16 +1188,16 @@ mod test {
             status: Some(task_status::Status::Failed(FailedTask {
                 error: "error".to_owned(),
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: "job".to_owned(),
                 stage_id: 1,
                 partition_id: 2,
             }),
         };
         state.save_task_status(&meta).await?;
-        let result = state._get_task_status("job", 1, 2).await?;
-        assert!(result.status.is_some());
-        match result.status.unwrap() {
+        let result = state._get_task_status("job", 1, 2);
+        assert!(result.is_some());
+        match result.unwrap().status.unwrap() {
             task_status::Status::Failed(_) => (),
             _ => panic!("Unexpected status"),
         }
@@ -911,15 +1216,15 @@ mod test {
             status: Some(task_status::Status::Failed(FailedTask {
                 error: "error".to_owned(),
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: "job".to_owned(),
                 stage_id: 1,
                 partition_id: 2,
             }),
         };
         state.save_task_status(&meta).await?;
-        let result = state._get_task_status("job", 25, 2).await;
-        assert!(result.is_err());
+        let result = state._get_task_status("job", 25, 2);
+        assert!(result.is_none());
         Ok(())
     }
 
@@ -936,8 +1241,9 @@ mod test {
             status: Some(job_status::Status::Queued(QueuedJob {})),
         };
         state.save_job_metadata(job_id, &job_status).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         assert_eq!(result, job_status);
         Ok(())
     }
@@ -960,7 +1266,7 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 0,
@@ -971,15 +1277,16 @@ mod test {
             status: Some(task_status::Status::Running(RunningTask {
                 executor_id: "".to_owned(),
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 1,
             }),
         };
         state.save_task_status(&meta).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         assert_eq!(result, job_status);
         Ok(())
     }
@@ -1002,7 +1309,7 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 0,
@@ -1011,15 +1318,16 @@ mod test {
         state.save_task_status(&meta).await?;
         let meta = TaskStatus {
             status: None,
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 1,
             }),
         };
         state.save_task_status(&meta).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         assert_eq!(result, job_status);
         Ok(())
     }
@@ -1042,7 +1350,7 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 0,
@@ -1054,15 +1362,16 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 1,
             }),
         };
         state.save_task_status(&meta).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         match result.status.unwrap() {
             job_status::Status::Completed(_) => (),
             status => panic!("Received status: {:?}", status),
@@ -1088,7 +1397,7 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 0,
@@ -1100,15 +1409,16 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 1,
             }),
         };
         state.save_task_status(&meta).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         match result.status.unwrap() {
             job_status::Status::Completed(_) => (),
             status => panic!("Received status: {:?}", status),
@@ -1134,7 +1444,7 @@ mod test {
                 executor_id: "".to_owned(),
                 partitions: vec![],
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 0,
@@ -1145,7 +1455,7 @@ mod test {
             status: Some(task_status::Status::Failed(FailedTask {
                 error: "".to_owned(),
             })),
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 1,
@@ -1154,29 +1464,20 @@ mod test {
         state.save_task_status(&meta).await?;
         let meta = TaskStatus {
             status: None,
-            partition_id: Some(PartitionId {
+            task_id: Some(PartitionId {
                 job_id: job_id.to_owned(),
                 stage_id: 0,
                 partition_id: 2,
             }),
         };
         state.save_task_status(&meta).await?;
+        // Call it explicitly to achieve fast synchronization
         state.synchronize_job_status(job_id).await?;
-        let result = state.get_job_metadata(job_id).await?;
+        let result = state.get_job_metadata(job_id).unwrap();
         match result.status.unwrap() {
             job_status::Status::Failed(_) => (),
             status => panic!("Received status: {:?}", status),
         }
         Ok(())
     }
-
-    #[test]
-    fn task_extract_job_id_from_task_key() {
-        let job_id = "foo";
-        assert_eq!(
-            extract_job_id_from_task_key(&get_task_status_key("namespace", job_id, 0, 1))
-                .unwrap(),
-            job_id
-        );
-    }
 }