You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/11/21 15:36:22 UTC

[arrow-ballista] branch master updated: Provide a memory StateBackendClient (#523)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new b5fa8720 Provide a memory StateBackendClient (#523)
b5fa8720 is described below

commit b5fa8720b6a6b97654298ba3e609f2568b9118ae
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Nov 21 23:36:17 2022 +0800

    Provide a memory StateBackendClient (#523)
    
    * Rename StateBackend::Standalone to StateBackend:Sled
    
    * Copy utility files from sled crate since they cannot be used directly
    
    * Provide a memory StateBackendClient
    
    * Fix dashmap deadlock issue
    
    * Fix for the comments
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/scheduler/scheduler_config_spec.toml      |   4 +-
 ballista/scheduler/src/main.rs                     |  22 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    |   8 +-
 ballista/scheduler/src/scheduler_server/mod.rs     |   4 +-
 ballista/scheduler/src/standalone.rs               |   6 +-
 ballista/scheduler/src/state/backend/etcd.rs       |   2 +-
 ballista/scheduler/src/state/backend/memory.rs     | 411 +++++++++++++++++++++
 ballista/scheduler/src/state/backend/mod.rs        |   9 +-
 .../src/state/backend/{standalone.rs => sled.rs}   |  18 +-
 ballista/scheduler/src/state/backend/utils/mod.rs  |  21 ++
 .../scheduler/src/state/backend/utils/oneshot.rs   | 179 +++++++++
 .../src/state/backend/utils/subscriber.rs          | 248 +++++++++++++
 ballista/scheduler/src/state/executor_manager.rs   |  10 +-
 ballista/scheduler/src/state/mod.rs                |   8 +-
 ballista/scheduler/src/test_utils.rs               |   4 +-
 15 files changed, 906 insertions(+), 48 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index 451becf8..e79abb85 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -33,8 +33,8 @@ doc = "Route for proxying flight results via scheduler. Should be of the form 'I
 abbr = "b"
 name = "config_backend"
 type = "ballista_scheduler::state::backend::StateBackend"
-doc = "The configuration backend for the scheduler, possible values: etcd, standalone. Default: standalone"
-default = "ballista_scheduler::state::backend::StateBackend::Standalone"
+doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
+default = "ballista_scheduler::state::backend::StateBackend::Sled"
 
 [[param]]
 abbr = "n"
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 1141a519..66a90f3e 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -21,6 +21,7 @@ use anyhow::{Context, Result};
 #[cfg(feature = "flight-sql")]
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
+use ballista_scheduler::state::backend::memory::MemoryBackendClient;
 use futures::future::{self, Either, TryFutureExt};
 use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
 use std::convert::Infallible;
@@ -37,7 +38,7 @@ use ballista_scheduler::api::{get_routes, EitherBody, Error};
 #[cfg(feature = "etcd")]
 use ballista_scheduler::state::backend::etcd::EtcdClient;
 #[cfg(feature = "sled")]
-use ballista_scheduler::state::backend::standalone::StandaloneClient;
+use ballista_scheduler::state::backend::sled::SledClient;
 use datafusion_proto::protobuf::LogicalPlanNode;
 
 use ballista_scheduler::scheduler_server::SchedulerServer;
@@ -211,10 +212,6 @@ async fn main() -> Result<()> {
     let addr = addr.parse()?;
 
     let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend {
-        #[cfg(not(any(feature = "sled", feature = "etcd")))]
-        _ => std::compile_error!(
-            "To build the scheduler enable at least one config backend feature (`etcd` or `sled`)"
-        ),
         #[cfg(feature = "etcd")]
         StateBackend::Etcd => {
             let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
@@ -229,26 +226,27 @@ async fn main() -> Result<()> {
             )
         }
         #[cfg(feature = "sled")]
-        StateBackend::Standalone => {
+        StateBackend::Sled => {
             if opt.sled_dir.is_empty() {
                 Arc::new(
-                    StandaloneClient::try_new_temporary()
-                        .context("Could not create standalone config backend")?,
+                    SledClient::try_new_temporary()
+                        .context("Could not create sled config backend")?,
                 )
             } else {
                 println!("{}", opt.sled_dir);
                 Arc::new(
-                    StandaloneClient::try_new(opt.sled_dir)
-                        .context("Could not create standalone config backend")?,
+                    SledClient::try_new(opt.sled_dir)
+                        .context("Could not create sled config backend")?,
                 )
             }
         }
         #[cfg(not(feature = "sled"))]
-        StateBackend::Standalone => {
+        StateBackend::Sled => {
             unimplemented!(
-                "build the scheduler with the `sled` feature to use the standalone config backend"
+                "build the scheduler with the `sled` feature to use the sled config backend"
             )
         }
+        StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
     };
 
     let config = SchedulerConfig {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 062e45a7..e61eb93c 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -587,13 +587,13 @@ mod test {
     use ballista_core::utils::default_session_builder;
 
     use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
-    use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
+    use crate::state::{backend::sled::SledClient, SchedulerState};
 
     use super::{SchedulerGrpc, SchedulerServer};
 
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
@@ -680,7 +680,7 @@ mod test {
 
     #[tokio::test]
     async fn test_stop_executor() -> Result<(), BallistaError> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
@@ -761,7 +761,7 @@ mod test {
     #[tokio::test]
     #[ignore]
     async fn test_expired_executor() -> Result<(), BallistaError> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 04410974..f4727f40 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -330,7 +330,7 @@ mod test {
     use ballista_core::serde::BallistaCodec;
 
     use crate::scheduler_server::{timestamp_millis, SchedulerServer};
-    use crate::state::backend::standalone::StandaloneClient;
+    use crate::state::backend::sled::SledClient;
 
     use crate::test_utils::{
         assert_completed_event, assert_failed_event, assert_no_submitted_event,
@@ -598,7 +598,7 @@ mod test {
     async fn test_scheduler(
         scheduling_policy: TaskSchedulingPolicy,
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs
index 975ad293..43f3e505 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -17,9 +17,7 @@
 
 use crate::config::SchedulerConfig;
 use crate::metrics::default_metrics_collector;
-use crate::{
-    scheduler_server::SchedulerServer, state::backend::standalone::StandaloneClient,
-};
+use crate::{scheduler_server::SchedulerServer, state::backend::sled::SledClient};
 use ballista_core::serde::protobuf::PhysicalPlanNode;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::create_grpc_server;
@@ -33,7 +31,7 @@ use std::{net::SocketAddr, sync::Arc};
 use tokio::net::TcpListener;
 
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
-    let client = StandaloneClient::try_new_temporary()?;
+    let client = SledClient::try_new_temporary()?;
 
     let metrics_collector = default_metrics_collector()?;
 
diff --git a/ballista/scheduler/src/state/backend/etcd.rs b/ballista/scheduler/src/state/backend/etcd.rs
index f0d48b3f..5fc83aaa 100644
--- a/ballista/scheduler/src/state/backend/etcd.rs
+++ b/ballista/scheduler/src/state/backend/etcd.rs
@@ -34,7 +34,7 @@ use crate::state::backend::{
     Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
 };
 
-/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration.
+/// A [`StateBackendClient`] implementation that uses etcd to save cluster state.
 #[derive(Clone)]
 pub struct EtcdClient {
     namespace: String,
diff --git a/ballista/scheduler/src/state/backend/memory.rs b/ballista/scheduler/src/state/backend/memory.rs
new file mode 100644
index 00000000..6cbedb84
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/memory.rs
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::state::backend::utils::subscriber::{Subscriber, Subscribers};
+use crate::state::backend::{
+    Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
+};
+use ballista_core::error::Result;
+use dashmap::DashMap;
+use futures::{FutureExt, Stream};
+use log::warn;
+use std::collections::{BTreeMap, HashSet};
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+type KeySpaceState = BTreeMap<String, Vec<u8>>;
+type KeyLock = Arc<Mutex<()>>;
+
+/// A [`StateBackendClient`] implementation that uses in memory map to save cluster state.
+#[derive(Clone, Default)]
+pub struct MemoryBackendClient {
+    /// The key is the KeySpace. For every KeySpace, there will be a tree map which is better for prefix filtering
+    states: DashMap<String, KeySpaceState>,
+    /// The key is the full key formatted like "/KeySpace/key". It's a flatted map
+    locks: DashMap<String, KeyLock>,
+    subscribers: Arc<Subscribers>,
+}
+
+impl MemoryBackendClient {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    fn get_space_key(keyspace: &Keyspace) -> String {
+        format!("/{:?}", keyspace)
+    }
+
+    fn get_flat_key(keyspace: &Keyspace, key: &str) -> String {
+        format!("/{:?}/{}", keyspace, key)
+    }
+}
+
+#[tonic::async_trait]
+impl StateBackendClient for MemoryBackendClient {
+    async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
+        let space_key = Self::get_space_key(&keyspace);
+        Ok(self
+            .states
+            .get(&space_key)
+            .map(|space_state| space_state.value().get(key).cloned().unwrap_or_default())
+            .unwrap_or_default())
+    }
+
+    async fn get_from_prefix(
+        &self,
+        keyspace: Keyspace,
+        prefix: &str,
+    ) -> Result<Vec<(String, Vec<u8>)>> {
+        let space_key = Self::get_space_key(&keyspace);
+        Ok(self
+            .states
+            .get(&space_key)
+            .map(|space_state| {
+                space_state
+                    .value()
+                    .range(prefix.to_owned()..)
+                    .take_while(|(k, _)| k.starts_with(prefix))
+                    .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+                    .collect()
+            })
+            .unwrap_or_default())
+    }
+
+    async fn scan(
+        &self,
+        keyspace: Keyspace,
+        limit: Option<usize>,
+    ) -> Result<Vec<(String, Vec<u8>)>> {
+        let space_key = Self::get_space_key(&keyspace);
+        Ok(self
+            .states
+            .get(&space_key)
+            .map(|space_state| {
+                if let Some(limit) = limit {
+                    space_state
+                        .value()
+                        .iter()
+                        .take(limit)
+                        .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+                        .collect::<Vec<(String, Vec<u8>)>>()
+                } else {
+                    space_state
+                        .value()
+                        .iter()
+                        .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+                        .collect::<Vec<(String, Vec<u8>)>>()
+                }
+            })
+            .unwrap_or_default())
+    }
+
+    async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
+        let space_key = Self::get_space_key(&keyspace);
+        Ok(self
+            .states
+            .get(&space_key)
+            .map(|space_state| {
+                space_state
+                    .value()
+                    .iter()
+                    .map(|e| format!("{}/{}", space_key, e.0))
+                    .collect::<HashSet<String>>()
+            })
+            .unwrap_or_default())
+    }
+
+    async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> Result<()> {
+        let space_key = Self::get_space_key(&keyspace);
+        if !self.states.contains_key(&space_key) {
+            self.states.insert(space_key.clone(), BTreeMap::default());
+        }
+        self.states
+            .get_mut(&space_key)
+            .unwrap()
+            .value_mut()
+            .insert(key.clone(), value.clone());
+
+        // Notify subscribers
+        let full_key = format!("{}/{}", space_key, key);
+        if let Some(res) = self.subscribers.reserve(&full_key) {
+            let event = WatchEvent::Put(full_key, value);
+            res.complete(&event);
+        }
+
+        Ok(())
+    }
+
+    /// Currently the locks should be acquired before invoking this method.
+    /// Later need to be refined by acquiring all of the related locks inside this method
+    async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> Result<()> {
+        for (op, keyspace, key) in ops.into_iter() {
+            match op {
+                Operation::Delete => {
+                    self.delete(keyspace, &key).await?;
+                }
+                Operation::Put(value) => {
+                    self.put(keyspace, key, value).await?;
+                }
+            };
+        }
+
+        Ok(())
+    }
+
+    /// Currently it's not used. Later will refine the caller side by leveraging this method
+    async fn mv(
+        &self,
+        from_keyspace: Keyspace,
+        to_keyspace: Keyspace,
+        key: &str,
+    ) -> Result<()> {
+        let from_space_key = Self::get_space_key(&from_keyspace);
+
+        let ops = if let Some(from_space_state) = self.states.get(&from_space_key) {
+            if let Some(state) = from_space_state.value().get(key) {
+                Some(vec![
+                    (Operation::Delete, from_keyspace, key.to_owned()),
+                    (Operation::Put(state.clone()), to_keyspace, key.to_owned()),
+                ])
+            } else {
+                // TODO should this return an error?
+                warn!(
+                    "Cannot move value at {}/{}, does not exist",
+                    from_space_key, key
+                );
+                None
+            }
+        } else {
+            // TODO should this return an error?
+            warn!(
+                "Cannot move value at {}/{}, does not exist",
+                from_space_key, key
+            );
+            None
+        };
+
+        if let Some(ops) = ops {
+            self.apply_txn(ops).await?;
+        }
+
+        Ok(())
+    }
+
+    async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn Lock>> {
+        let flat_key = Self::get_flat_key(&keyspace, key);
+        let lock = self
+            .locks
+            .entry(flat_key)
+            .or_insert_with(|| Arc::new(Mutex::new(())));
+        Ok(Box::new(lock.value().clone().lock_owned().await))
+    }
+
+    async fn watch(&self, keyspace: Keyspace, prefix: String) -> Result<Box<dyn Watch>> {
+        let prefix = format!("/{:?}/{}", keyspace, prefix);
+
+        Ok(Box::new(MemoryWatch {
+            subscriber: self.subscribers.register(prefix.as_bytes()),
+        }))
+    }
+
+    async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
+        let space_key = Self::get_space_key(&keyspace);
+        if let Some(mut space_state) = self.states.get_mut(&space_key) {
+            if space_state.value_mut().remove(key).is_some() {
+                // Notify subscribers
+                let full_key = format!("{}/{}", space_key, key);
+                if let Some(res) = self.subscribers.reserve(&full_key) {
+                    let event = WatchEvent::Delete(full_key);
+                    res.complete(&event);
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+struct MemoryWatch {
+    subscriber: Subscriber,
+}
+
+#[tonic::async_trait]
+impl Watch for MemoryWatch {
+    async fn cancel(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
+
+impl Stream for MemoryWatch {
+    type Item = WatchEvent;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.get_mut().subscriber.poll_unpin(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.subscriber.size_hint()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{StateBackendClient, Watch, WatchEvent};
+
+    use crate::state::backend::memory::MemoryBackendClient;
+    use crate::state::backend::{Keyspace, Operation};
+    use crate::state::with_locks;
+    use futures::StreamExt;
+    use std::result::Result;
+
+    #[tokio::test]
+    async fn put_read() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key";
+        let value = "value".as_bytes();
+        client
+            .put(Keyspace::Slots, key.to_owned(), value.to_vec())
+            .await?;
+        assert_eq!(client.get(Keyspace::Slots, key).await?, value);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn put_move() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key";
+        let value = "value".as_bytes();
+        client
+            .put(Keyspace::ActiveJobs, key.to_owned(), value.to_vec())
+            .await?;
+        client
+            .mv(Keyspace::ActiveJobs, Keyspace::FailedJobs, key)
+            .await?;
+        assert_eq!(client.get(Keyspace::FailedJobs, key).await?, value);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn multiple_operation() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key".to_string();
+        let value = "value".as_bytes().to_vec();
+        let locks = client
+            .acquire_locks(vec![(Keyspace::ActiveJobs, ""), (Keyspace::Slots, "")])
+            .await?;
+
+        let _r: ballista_core::error::Result<()> = with_locks(locks, async {
+            let txn_ops = vec![
+                (Operation::Put(value.clone()), Keyspace::Slots, key.clone()),
+                (
+                    Operation::Put(value.clone()),
+                    Keyspace::ActiveJobs,
+                    key.clone(),
+                ),
+            ];
+            client.apply_txn(txn_ops).await?;
+            Ok(())
+        })
+        .await;
+
+        assert_eq!(client.get(Keyspace::Slots, key.as_str()).await?, value);
+        assert_eq!(client.get(Keyspace::ActiveJobs, key.as_str()).await?, value);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_empty() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key";
+        let empty: &[u8] = &[];
+        assert_eq!(client.get(Keyspace::Slots, key).await?, empty);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_prefix() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key";
+        let value = "value".as_bytes();
+        client
+            .put(Keyspace::Slots, format!("{}/1", key), value.to_vec())
+            .await?;
+        client
+            .put(Keyspace::Slots, format!("{}/2", key), value.to_vec())
+            .await?;
+        assert_eq!(
+            client.get_from_prefix(Keyspace::Slots, key).await?,
+            vec![
+                ("/Slots/key/1".to_owned(), value.to_vec()),
+                ("/Slots/key/2".to_owned(), value.to_vec())
+            ]
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_watch() -> Result<(), Box<dyn std::error::Error>> {
+        let client = MemoryBackendClient::new();
+        let key = "key";
+        let value = "value".as_bytes();
+        let mut watch_keyspace: Box<dyn Watch> =
+            client.watch(Keyspace::Slots, "".to_owned()).await?;
+        let mut watch_key: Box<dyn Watch> =
+            client.watch(Keyspace::Slots, key.to_owned()).await?;
+        client
+            .put(Keyspace::Slots, key.to_owned(), value.to_vec())
+            .await?;
+        assert_eq!(
+            watch_keyspace.next().await,
+            Some(WatchEvent::Put(
+                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+                value.to_owned()
+            ))
+        );
+        assert_eq!(
+            watch_key.next().await,
+            Some(WatchEvent::Put(
+                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+                value.to_owned()
+            ))
+        );
+        let value2 = "value2".as_bytes();
+        client
+            .put(Keyspace::Slots, key.to_owned(), value2.to_vec())
+            .await?;
+        assert_eq!(
+            watch_keyspace.next().await,
+            Some(WatchEvent::Put(
+                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+                value2.to_owned()
+            ))
+        );
+        assert_eq!(
+            watch_key.next().await,
+            Some(WatchEvent::Put(
+                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+                value2.to_owned()
+            ))
+        );
+        watch_keyspace.cancel().await?;
+        watch_key.cancel().await?;
+        Ok(())
+    }
+}
diff --git a/ballista/scheduler/src/state/backend/mod.rs b/ballista/scheduler/src/state/backend/mod.rs
index cbba3ed1..0807f3c2 100644
--- a/ballista/scheduler/src/state/backend/mod.rs
+++ b/ballista/scheduler/src/state/backend/mod.rs
@@ -24,15 +24,18 @@ use tokio::sync::OwnedMutexGuard;
 
 #[cfg(feature = "etcd")]
 pub mod etcd;
+pub mod memory;
 #[cfg(feature = "sled")]
-pub mod standalone;
+pub mod sled;
+mod utils;
 
 // an enum used to configure the backend
 // needs to be visible to code generated by configure_me
 #[derive(Debug, Clone, ArgEnum, serde::Deserialize)]
 pub enum StateBackend {
     Etcd,
-    Standalone,
+    Memory,
+    Sled,
 }
 
 impl std::str::FromStr for StateBackend {
@@ -134,7 +137,7 @@ pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
     async fn cancel(&mut self) -> Result<()>;
 }
 
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub enum WatchEvent {
     /// Contains the inserted or updated key and the new value
     Put(String, Vec<u8>),
diff --git a/ballista/scheduler/src/state/backend/standalone.rs b/ballista/scheduler/src/state/backend/sled.rs
similarity index 96%
rename from ballista/scheduler/src/state/backend/standalone.rs
rename to ballista/scheduler/src/state/backend/sled.rs
index 57bf7470..e42da4ca 100644
--- a/ballista/scheduler/src/state/backend/standalone.rs
+++ b/ballista/scheduler/src/state/backend/sled.rs
@@ -29,15 +29,15 @@ use crate::state::backend::{
     Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
 };
 
-/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster configuration.
+/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster state.
 #[derive(Clone)]
-pub struct StandaloneClient {
+pub struct SledClient {
     db: sled::Db,
     locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
 }
 
-impl StandaloneClient {
-    /// Creates a StandaloneClient that saves data to the specified file.
+impl SledClient {
+    /// Creates a SledClient that saves data to the specified file.
     pub fn try_new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
         Ok(Self {
             db: sled::open(path).map_err(sled_to_ballista_error)?,
@@ -45,7 +45,7 @@ impl StandaloneClient {
         })
     }
 
-    /// Creates a StandaloneClient that saves data to a temp file.
+    /// Creates a SledClient that saves data to a temp file.
     pub fn try_new_temporary() -> Result<Self> {
         Ok(Self {
             db: sled::Config::new()
@@ -65,7 +65,7 @@ fn sled_to_ballista_error(e: sled::Error) -> BallistaError {
 }
 
 #[tonic::async_trait]
-impl StateBackendClient for StandaloneClient {
+impl StateBackendClient for SledClient {
     async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
         let key = format!("/{:?}/{}", keyspace, key);
         Ok(self
@@ -282,15 +282,15 @@ impl Stream for SledWatch {
 
 #[cfg(test)]
 mod tests {
-    use super::{StandaloneClient, StateBackendClient, Watch, WatchEvent};
+    use super::{SledClient, StateBackendClient, Watch, WatchEvent};
 
     use crate::state::backend::{Keyspace, Operation};
     use crate::state::with_locks;
     use futures::StreamExt;
     use std::result::Result;
 
-    fn create_instance() -> Result<StandaloneClient, Box<dyn std::error::Error>> {
-        Ok(StandaloneClient::try_new_temporary()?)
+    fn create_instance() -> Result<SledClient, Box<dyn std::error::Error>> {
+        Ok(SledClient::try_new_temporary()?)
     }
 
     #[tokio::test]
diff --git a/ballista/scheduler/src/state/backend/utils/mod.rs b/ballista/scheduler/src/state/backend/utils/mod.rs
new file mode 100644
index 00000000..de95dd6e
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[allow(dead_code)]
+mod oneshot;
+#[allow(dead_code)]
+pub(crate) mod subscriber;
diff --git a/ballista/scheduler/src/state/backend/utils/oneshot.rs b/ballista/scheduler/src/state/backend/utils/oneshot.rs
new file mode 100644
index 00000000..a0d14699
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/oneshot.rs
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! It's mainly a modified version of sled::oneshot
+
+use std::{
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll, Waker},
+    time::{Duration, Instant},
+};
+
+use parking_lot::{Condvar, Mutex};
+
+#[derive(Debug)]
+struct OneShotState<T> {
+    filled: bool,
+    fused: bool,
+    item: Option<T>,
+    waker: Option<Waker>,
+}
+
+impl<T> Default for OneShotState<T> {
+    fn default() -> OneShotState<T> {
+        OneShotState {
+            filled: false,
+            fused: false,
+            item: None,
+            waker: None,
+        }
+    }
+}
+
+/// A Future value which may or may not be filled
+#[derive(Debug)]
+pub struct OneShot<T> {
+    mu: Arc<Mutex<OneShotState<T>>>,
+    cv: Arc<Condvar>,
+}
+
+/// The completer side of the Future
+pub struct OneShotFiller<T> {
+    mu: Arc<Mutex<OneShotState<T>>>,
+    cv: Arc<Condvar>,
+}
+
+impl<T> OneShot<T> {
+    /// Create a new `OneShotFiller` and the `OneShot`
+    /// that will be filled by its completion.
+    pub fn pair() -> (OneShotFiller<T>, Self) {
+        let mu = Arc::new(Mutex::new(OneShotState::default()));
+        let cv = Arc::new(Condvar::new());
+        let future = Self {
+            mu: mu.clone(),
+            cv: cv.clone(),
+        };
+        let filler = OneShotFiller { mu, cv };
+
+        (filler, future)
+    }
+
+    /// Block on the `OneShot`'s completion
+    /// or dropping of the `OneShotFiller`
+    pub fn wait(self) -> Option<T> {
+        let mut inner = self.mu.lock();
+        while !inner.filled {
+            self.cv.wait(&mut inner);
+        }
+        inner.item.take()
+    }
+
+    /// Block on the `OneShot`'s completion
+    /// or dropping of the `OneShotFiller`,
+    /// returning an error if not filled
+    /// before a given timeout or if the
+    /// system shuts down before then.
+    ///
+    /// Upon a successful receive, the
+    /// oneshot should be dropped, as it
+    /// will never yield that value again.
+    pub fn wait_timeout(
+        &mut self,
+        mut timeout: Duration,
+    ) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
+        let mut inner = self.mu.lock();
+        while !inner.filled {
+            let start = Instant::now();
+            let res = self.cv.wait_for(&mut inner, timeout);
+            if res.timed_out() {
+                return Err(std::sync::mpsc::RecvTimeoutError::Disconnected);
+            }
+            timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+                timeout
+            } else {
+                Duration::from_nanos(0)
+            };
+        }
+        if let Some(item) = inner.item.take() {
+            Ok(item)
+        } else {
+            Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
+        }
+    }
+}
+
+impl<T> Future for OneShot<T> {
+    type Output = Option<T>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let mut state = self.mu.lock();
+        if state.fused {
+            return Poll::Pending;
+        }
+        if state.filled {
+            state.fused = true;
+            Poll::Ready(state.item.take())
+        } else {
+            state.waker = Some(cx.waker().clone());
+            Poll::Pending
+        }
+    }
+}
+
+impl<T> OneShotFiller<T> {
+    /// Complete the `OneShot`
+    pub fn fill(self, inner: T) {
+        let mut state = self.mu.lock();
+
+        if let Some(waker) = state.waker.take() {
+            waker.wake();
+        }
+
+        state.filled = true;
+        state.item = Some(inner);
+
+        // having held the mutex makes this linearized
+        // with the notify below.
+        drop(state);
+
+        let _notified = self.cv.notify_all();
+    }
+}
+
+impl<T> Drop for OneShotFiller<T> {
+    fn drop(&mut self) {
+        let mut state = self.mu.lock();
+
+        if state.filled {
+            return;
+        }
+
+        if let Some(waker) = state.waker.take() {
+            waker.wake();
+        }
+
+        state.filled = true;
+
+        // having held the mutex makes this linearized
+        // with the notify below.
+        drop(state);
+
+        let _notified = self.cv.notify_all();
+    }
+}
diff --git a/ballista/scheduler/src/state/backend/utils/subscriber.rs b/ballista/scheduler/src/state/backend/utils/subscriber.rs
new file mode 100644
index 00000000..dd74b664
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/subscriber.rs
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! It's mainly a modified version of sled::subscriber
+
+use crate::state::backend::utils::oneshot::{OneShot, OneShotFiller};
+use crate::state::backend::WatchEvent;
+
+use parking_lot::RwLock;
+use std::collections::{BTreeMap, HashMap};
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::atomic::{AtomicBool, AtomicUsize};
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError};
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+use std::time::{Duration, Instant};
+
+static ID_GEN: AtomicUsize = AtomicUsize::new(0);
+
+type Senders = HashMap<usize, (Option<Waker>, SyncSender<OneShot<Option<WatchEvent>>>)>;
+
+/// Aynchronous, non-blocking subscriber:
+///
+/// `Subscription` implements `Future<Output=Option<Event>>`.
+///
+/// `while let Some(event) = (&mut subscriber).await { /* use it */ }`
+pub struct Subscriber {
+    id: usize,
+    rx: Receiver<OneShot<Option<WatchEvent>>>,
+    existing: Option<OneShot<Option<WatchEvent>>>,
+    home: Arc<RwLock<Senders>>,
+}
+
+impl Drop for Subscriber {
+    fn drop(&mut self) {
+        let mut w_senders = self.home.write();
+        w_senders.remove(&self.id);
+    }
+}
+
+impl Subscriber {
+    /// Attempts to wait for a value on this `Subscriber`, returning
+    /// an error if no event arrives within the provided `Duration`
+    /// or if the backing `Db` shuts down.
+    pub fn next_timeout(
+        &mut self,
+        mut timeout: Duration,
+    ) -> std::result::Result<WatchEvent, std::sync::mpsc::RecvTimeoutError> {
+        loop {
+            let start = Instant::now();
+            let mut future_rx = if let Some(future_rx) = self.existing.take() {
+                future_rx
+            } else {
+                self.rx.recv_timeout(timeout)?
+            };
+            timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+                timeout
+            } else {
+                Duration::from_nanos(0)
+            };
+
+            let start = Instant::now();
+            match future_rx.wait_timeout(timeout) {
+                Ok(Some(event)) => return Ok(event),
+                Ok(None) => (),
+                Err(timeout_error) => {
+                    self.existing = Some(future_rx);
+                    return Err(timeout_error);
+                }
+            }
+            timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+                timeout
+            } else {
+                Duration::from_nanos(0)
+            };
+        }
+    }
+}
+
+impl Future for Subscriber {
+    type Output = Option<WatchEvent>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        loop {
+            let mut future_rx = if let Some(future_rx) = self.existing.take() {
+                future_rx
+            } else {
+                match self.rx.try_recv() {
+                    Ok(future_rx) => future_rx,
+                    Err(TryRecvError::Empty) => break,
+                    Err(TryRecvError::Disconnected) => return Poll::Ready(None),
+                }
+            };
+
+            match Future::poll(Pin::new(&mut future_rx), cx) {
+                Poll::Ready(Some(event)) => return Poll::Ready(event),
+                Poll::Ready(None) => continue,
+                Poll::Pending => {
+                    self.existing = Some(future_rx);
+                    return Poll::Pending;
+                }
+            }
+        }
+        let mut home = self.home.write();
+        let entry = home.get_mut(&self.id).unwrap();
+        entry.0 = Some(cx.waker().clone());
+        Poll::Pending
+    }
+}
+
+impl Iterator for Subscriber {
+    type Item = WatchEvent;
+
+    fn next(&mut self) -> Option<WatchEvent> {
+        loop {
+            let future_rx = self.rx.recv().ok()?;
+            match future_rx.wait() {
+                Some(Some(event)) => return Some(event),
+                Some(None) => return None,
+                None => continue,
+            }
+        }
+    }
+}
+
+#[derive(Debug, Default)]
+pub(crate) struct Subscribers {
+    watched: RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Senders>>>>,
+    ever_used: AtomicBool,
+}
+
+impl Drop for Subscribers {
+    fn drop(&mut self) {
+        let watched = self.watched.read();
+
+        for senders in watched.values() {
+            let senders = std::mem::take(&mut *senders.write());
+            for (_, (waker, sender)) in senders {
+                drop(sender);
+                if let Some(waker) = waker {
+                    waker.wake();
+                }
+            }
+        }
+    }
+}
+
+impl Subscribers {
+    pub(crate) fn register(&self, prefix: &[u8]) -> Subscriber {
+        self.ever_used.store(true, Relaxed);
+        let r_mu = {
+            let r_mu = self.watched.read();
+            if r_mu.contains_key(prefix) {
+                r_mu
+            } else {
+                drop(r_mu);
+                let mut w_mu = self.watched.write();
+                if !w_mu.contains_key(prefix) {
+                    let old = w_mu.insert(
+                        prefix.to_vec(),
+                        Arc::new(RwLock::new(HashMap::default())),
+                    );
+                    assert!(old.is_none());
+                }
+                drop(w_mu);
+                self.watched.read()
+            }
+        };
+
+        let (tx, rx) = sync_channel(1024);
+
+        let arc_senders = &r_mu[prefix];
+        let mut w_senders = arc_senders.write();
+
+        let id = ID_GEN.fetch_add(1, Relaxed);
+
+        w_senders.insert(id, (None, tx));
+
+        Subscriber {
+            id,
+            rx,
+            existing: None,
+            home: arc_senders.clone(),
+        }
+    }
+
+    pub(crate) fn reserve<R: AsRef<[u8]>>(&self, key: R) -> Option<ReservedBroadcast> {
+        if !self.ever_used.load(Relaxed) {
+            return None;
+        }
+
+        let r_mu = self.watched.read();
+        let prefixes = r_mu.iter().filter(|(k, _)| key.as_ref().starts_with(k));
+
+        let mut subscribers = vec![];
+
+        for (_, subs_rwl) in prefixes {
+            let subs = subs_rwl.read();
+
+            for (_id, (waker, sender)) in subs.iter() {
+                let (tx, rx) = OneShot::pair();
+                if sender.send(rx).is_err() {
+                    continue;
+                }
+                subscribers.push((waker.clone(), tx));
+            }
+        }
+
+        if subscribers.is_empty() {
+            None
+        } else {
+            Some(ReservedBroadcast { subscribers })
+        }
+    }
+}
+
+pub(crate) struct ReservedBroadcast {
+    subscribers: Vec<(Option<Waker>, OneShotFiller<Option<WatchEvent>>)>,
+}
+
+impl ReservedBroadcast {
+    pub fn complete(self, event: &WatchEvent) {
+        let iter = self.subscribers.into_iter();
+
+        for (waker, tx) in iter {
+            tx.fill(Some(event.clone()));
+            if let Some(waker) = waker {
+                waker.wake();
+            }
+        }
+    }
+}
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index d86674f1..165502b2 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -910,7 +910,7 @@ impl ExecutorHeartbeatListener {
 #[cfg(test)]
 mod test {
     use crate::config::SlotsPolicy;
-    use crate::state::backend::standalone::StandaloneClient;
+    use crate::state::backend::sled::SledClient;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
     use ballista_core::error::Result;
     use ballista_core::serde::scheduler::{
@@ -928,7 +928,7 @@ mod test {
     }
 
     async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
 
         let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
@@ -966,7 +966,7 @@ mod test {
     }
 
     async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
 
         let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
@@ -1021,7 +1021,7 @@ mod test {
 
         let executors = test_executors(10, 4);
 
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
 
         let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
@@ -1066,7 +1066,7 @@ mod test {
     }
 
     async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
 
         let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3c12d2c8..3c580280 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -428,7 +428,7 @@ pub async fn with_locks<Out, F: Future<Output = Out>>(
 
 #[cfg(test)]
 mod test {
-    use crate::state::backend::standalone::StandaloneClient;
+    use crate::state::backend::sled::SledClient;
     use crate::state::SchedulerState;
     use ballista_core::config::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
     use ballista_core::error::Result;
@@ -454,7 +454,7 @@ mod test {
     // We should free any reservations which are not assigned
     #[tokio::test]
     async fn test_offer_free_reservations() -> Result<()> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::new_with_default_scheduler_name(
                 state_storage,
@@ -490,7 +490,7 @@ mod test {
         let config = BallistaConfig::builder()
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
@@ -575,7 +575,7 @@ mod test {
         let config = BallistaConfig::builder()
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs
index 2566dcb5..b4526fd1 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -27,7 +27,7 @@ use async_trait::async_trait;
 use crate::config::SchedulerConfig;
 use crate::metrics::SchedulerMetricsCollector;
 use crate::scheduler_server::{timestamp_millis, SchedulerServer};
-use crate::state::backend::standalone::StandaloneClient;
+use crate::state::backend::sled::SledClient;
 
 use crate::state::executor_manager::ExecutorManager;
 use crate::state::task_manager::TaskLauncher;
@@ -380,7 +380,7 @@ impl SchedulerTest {
         task_slots_per_executor: usize,
         runner: Option<Arc<dyn TaskRunner>>,
     ) -> Result<Self> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state_storage = Arc::new(SledClient::try_new_temporary()?);
 
         let ballista_config = BallistaConfig::builder()
             .set(