You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/11/21 15:36:22 UTC
[arrow-ballista] branch master updated: Provide a memory StateBackendClient (#523)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new b5fa8720 Provide a memory StateBackendClient (#523)
b5fa8720 is described below
commit b5fa8720b6a6b97654298ba3e609f2568b9118ae
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Nov 21 23:36:17 2022 +0800
Provide a memory StateBackendClient (#523)
* Rename StateBackend::Standalone to StateBackend:Sled
* Copy utility files from sled crate since they cannot be used directly
* Provide a memory StateBackendClient
* Fix dashmap deadlock issue
* Fix for the comments
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/scheduler/scheduler_config_spec.toml | 4 +-
ballista/scheduler/src/main.rs | 22 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 8 +-
ballista/scheduler/src/scheduler_server/mod.rs | 4 +-
ballista/scheduler/src/standalone.rs | 6 +-
ballista/scheduler/src/state/backend/etcd.rs | 2 +-
ballista/scheduler/src/state/backend/memory.rs | 411 +++++++++++++++++++++
ballista/scheduler/src/state/backend/mod.rs | 9 +-
.../src/state/backend/{standalone.rs => sled.rs} | 18 +-
ballista/scheduler/src/state/backend/utils/mod.rs | 21 ++
.../scheduler/src/state/backend/utils/oneshot.rs | 179 +++++++++
.../src/state/backend/utils/subscriber.rs | 248 +++++++++++++
ballista/scheduler/src/state/executor_manager.rs | 10 +-
ballista/scheduler/src/state/mod.rs | 8 +-
ballista/scheduler/src/test_utils.rs | 4 +-
15 files changed, 906 insertions(+), 48 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index 451becf8..e79abb85 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -33,8 +33,8 @@ doc = "Route for proxying flight results via scheduler. Should be of the form 'I
abbr = "b"
name = "config_backend"
type = "ballista_scheduler::state::backend::StateBackend"
-doc = "The configuration backend for the scheduler, possible values: etcd, standalone. Default: standalone"
-default = "ballista_scheduler::state::backend::StateBackend::Standalone"
+doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
+default = "ballista_scheduler::state::backend::StateBackend::Sled"
[[param]]
abbr = "n"
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 1141a519..66a90f3e 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -21,6 +21,7 @@ use anyhow::{Context, Result};
#[cfg(feature = "flight-sql")]
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
+use ballista_scheduler::state::backend::memory::MemoryBackendClient;
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
@@ -37,7 +38,7 @@ use ballista_scheduler::api::{get_routes, EitherBody, Error};
#[cfg(feature = "etcd")]
use ballista_scheduler::state::backend::etcd::EtcdClient;
#[cfg(feature = "sled")]
-use ballista_scheduler::state::backend::standalone::StandaloneClient;
+use ballista_scheduler::state::backend::sled::SledClient;
use datafusion_proto::protobuf::LogicalPlanNode;
use ballista_scheduler::scheduler_server::SchedulerServer;
@@ -211,10 +212,6 @@ async fn main() -> Result<()> {
let addr = addr.parse()?;
let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend {
- #[cfg(not(any(feature = "sled", feature = "etcd")))]
- _ => std::compile_error!(
- "To build the scheduler enable at least one config backend feature (`etcd` or `sled`)"
- ),
#[cfg(feature = "etcd")]
StateBackend::Etcd => {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
@@ -229,26 +226,27 @@ async fn main() -> Result<()> {
)
}
#[cfg(feature = "sled")]
- StateBackend::Standalone => {
+ StateBackend::Sled => {
if opt.sled_dir.is_empty() {
Arc::new(
- StandaloneClient::try_new_temporary()
- .context("Could not create standalone config backend")?,
+ SledClient::try_new_temporary()
+ .context("Could not create sled config backend")?,
)
} else {
println!("{}", opt.sled_dir);
Arc::new(
- StandaloneClient::try_new(opt.sled_dir)
- .context("Could not create standalone config backend")?,
+ SledClient::try_new(opt.sled_dir)
+ .context("Could not create sled config backend")?,
)
}
}
#[cfg(not(feature = "sled"))]
- StateBackend::Standalone => {
+ StateBackend::Sled => {
unimplemented!(
- "build the scheduler with the `sled` feature to use the standalone config backend"
+ "build the scheduler with the `sled` feature to use the sled config backend"
)
}
+ StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
};
let config = SchedulerConfig {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 062e45a7..e61eb93c 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -587,13 +587,13 @@ mod test {
use ballista_core::utils::default_session_builder;
use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
- use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
+ use crate::state::{backend::sled::SledClient, SchedulerState};
use super::{SchedulerGrpc, SchedulerServer};
#[tokio::test]
async fn test_poll_work() -> Result<(), BallistaError> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
@@ -680,7 +680,7 @@ mod test {
#[tokio::test]
async fn test_stop_executor() -> Result<(), BallistaError> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
@@ -761,7 +761,7 @@ mod test {
#[tokio::test]
#[ignore]
async fn test_expired_executor() -> Result<(), BallistaError> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 04410974..f4727f40 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -330,7 +330,7 @@ mod test {
use ballista_core::serde::BallistaCodec;
use crate::scheduler_server::{timestamp_millis, SchedulerServer};
- use crate::state::backend::standalone::StandaloneClient;
+ use crate::state::backend::sled::SledClient;
use crate::test_utils::{
assert_completed_event, assert_failed_event, assert_no_submitted_event,
@@ -598,7 +598,7 @@ mod test {
async fn test_scheduler(
scheduling_policy: TaskSchedulingPolicy,
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs
index 975ad293..43f3e505 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -17,9 +17,7 @@
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
-use crate::{
- scheduler_server::SchedulerServer, state::backend::standalone::StandaloneClient,
-};
+use crate::{scheduler_server::SchedulerServer, state::backend::sled::SledClient};
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::create_grpc_server;
@@ -33,7 +31,7 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
- let client = StandaloneClient::try_new_temporary()?;
+ let client = SledClient::try_new_temporary()?;
let metrics_collector = default_metrics_collector()?;
diff --git a/ballista/scheduler/src/state/backend/etcd.rs b/ballista/scheduler/src/state/backend/etcd.rs
index f0d48b3f..5fc83aaa 100644
--- a/ballista/scheduler/src/state/backend/etcd.rs
+++ b/ballista/scheduler/src/state/backend/etcd.rs
@@ -34,7 +34,7 @@ use crate::state::backend::{
Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
};
-/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration.
+/// A [`StateBackendClient`] implementation that uses etcd to save cluster state.
#[derive(Clone)]
pub struct EtcdClient {
namespace: String,
diff --git a/ballista/scheduler/src/state/backend/memory.rs b/ballista/scheduler/src/state/backend/memory.rs
new file mode 100644
index 00000000..6cbedb84
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/memory.rs
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::state::backend::utils::subscriber::{Subscriber, Subscribers};
+use crate::state::backend::{
+ Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
+};
+use ballista_core::error::Result;
+use dashmap::DashMap;
+use futures::{FutureExt, Stream};
+use log::warn;
+use std::collections::{BTreeMap, HashSet};
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+type KeySpaceState = BTreeMap<String, Vec<u8>>;
+type KeyLock = Arc<Mutex<()>>;
+
+/// A [`StateBackendClient`] implementation that uses in memory map to save cluster state.
+#[derive(Clone, Default)]
+pub struct MemoryBackendClient {
+ /// The key is the KeySpace. For every KeySpace, there will be a tree map which is better for prefix filtering
+ states: DashMap<String, KeySpaceState>,
+ /// The key is the full key formatted like "/KeySpace/key". It's a flatted map
+ locks: DashMap<String, KeyLock>,
+ subscribers: Arc<Subscribers>,
+}
+
+impl MemoryBackendClient {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ fn get_space_key(keyspace: &Keyspace) -> String {
+ format!("/{:?}", keyspace)
+ }
+
+ fn get_flat_key(keyspace: &Keyspace, key: &str) -> String {
+ format!("/{:?}/{}", keyspace, key)
+ }
+}
+
+#[tonic::async_trait]
+impl StateBackendClient for MemoryBackendClient {
+ async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
+ let space_key = Self::get_space_key(&keyspace);
+ Ok(self
+ .states
+ .get(&space_key)
+ .map(|space_state| space_state.value().get(key).cloned().unwrap_or_default())
+ .unwrap_or_default())
+ }
+
+ async fn get_from_prefix(
+ &self,
+ keyspace: Keyspace,
+ prefix: &str,
+ ) -> Result<Vec<(String, Vec<u8>)>> {
+ let space_key = Self::get_space_key(&keyspace);
+ Ok(self
+ .states
+ .get(&space_key)
+ .map(|space_state| {
+ space_state
+ .value()
+ .range(prefix.to_owned()..)
+ .take_while(|(k, _)| k.starts_with(prefix))
+ .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+ .collect()
+ })
+ .unwrap_or_default())
+ }
+
+ async fn scan(
+ &self,
+ keyspace: Keyspace,
+ limit: Option<usize>,
+ ) -> Result<Vec<(String, Vec<u8>)>> {
+ let space_key = Self::get_space_key(&keyspace);
+ Ok(self
+ .states
+ .get(&space_key)
+ .map(|space_state| {
+ if let Some(limit) = limit {
+ space_state
+ .value()
+ .iter()
+ .take(limit)
+ .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+ .collect::<Vec<(String, Vec<u8>)>>()
+ } else {
+ space_state
+ .value()
+ .iter()
+ .map(|e| (format!("{}/{}", space_key, e.0), e.1.clone()))
+ .collect::<Vec<(String, Vec<u8>)>>()
+ }
+ })
+ .unwrap_or_default())
+ }
+
+ async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
+ let space_key = Self::get_space_key(&keyspace);
+ Ok(self
+ .states
+ .get(&space_key)
+ .map(|space_state| {
+ space_state
+ .value()
+ .iter()
+ .map(|e| format!("{}/{}", space_key, e.0))
+ .collect::<HashSet<String>>()
+ })
+ .unwrap_or_default())
+ }
+
+ async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> Result<()> {
+ let space_key = Self::get_space_key(&keyspace);
+ if !self.states.contains_key(&space_key) {
+ self.states.insert(space_key.clone(), BTreeMap::default());
+ }
+ self.states
+ .get_mut(&space_key)
+ .unwrap()
+ .value_mut()
+ .insert(key.clone(), value.clone());
+
+ // Notify subscribers
+ let full_key = format!("{}/{}", space_key, key);
+ if let Some(res) = self.subscribers.reserve(&full_key) {
+ let event = WatchEvent::Put(full_key, value);
+ res.complete(&event);
+ }
+
+ Ok(())
+ }
+
+ /// Currently the locks should be acquired before invoking this method.
+ /// Later need to be refined by acquiring all of the related locks inside this method
+ async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> Result<()> {
+ for (op, keyspace, key) in ops.into_iter() {
+ match op {
+ Operation::Delete => {
+ self.delete(keyspace, &key).await?;
+ }
+ Operation::Put(value) => {
+ self.put(keyspace, key, value).await?;
+ }
+ };
+ }
+
+ Ok(())
+ }
+
+ /// Currently it's not used. Later will refine the caller side by leveraging this method
+ async fn mv(
+ &self,
+ from_keyspace: Keyspace,
+ to_keyspace: Keyspace,
+ key: &str,
+ ) -> Result<()> {
+ let from_space_key = Self::get_space_key(&from_keyspace);
+
+ let ops = if let Some(from_space_state) = self.states.get(&from_space_key) {
+ if let Some(state) = from_space_state.value().get(key) {
+ Some(vec![
+ (Operation::Delete, from_keyspace, key.to_owned()),
+ (Operation::Put(state.clone()), to_keyspace, key.to_owned()),
+ ])
+ } else {
+ // TODO should this return an error?
+ warn!(
+ "Cannot move value at {}/{}, does not exist",
+ from_space_key, key
+ );
+ None
+ }
+ } else {
+ // TODO should this return an error?
+ warn!(
+ "Cannot move value at {}/{}, does not exist",
+ from_space_key, key
+ );
+ None
+ };
+
+ if let Some(ops) = ops {
+ self.apply_txn(ops).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn Lock>> {
+ let flat_key = Self::get_flat_key(&keyspace, key);
+ let lock = self
+ .locks
+ .entry(flat_key)
+ .or_insert_with(|| Arc::new(Mutex::new(())));
+ Ok(Box::new(lock.value().clone().lock_owned().await))
+ }
+
+ async fn watch(&self, keyspace: Keyspace, prefix: String) -> Result<Box<dyn Watch>> {
+ let prefix = format!("/{:?}/{}", keyspace, prefix);
+
+ Ok(Box::new(MemoryWatch {
+ subscriber: self.subscribers.register(prefix.as_bytes()),
+ }))
+ }
+
+ async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
+ let space_key = Self::get_space_key(&keyspace);
+ if let Some(mut space_state) = self.states.get_mut(&space_key) {
+ if space_state.value_mut().remove(key).is_some() {
+ // Notify subscribers
+ let full_key = format!("{}/{}", space_key, key);
+ if let Some(res) = self.subscribers.reserve(&full_key) {
+ let event = WatchEvent::Delete(full_key);
+ res.complete(&event);
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
+struct MemoryWatch {
+ subscriber: Subscriber,
+}
+
+#[tonic::async_trait]
+impl Watch for MemoryWatch {
+ async fn cancel(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
+
+impl Stream for MemoryWatch {
+ type Item = WatchEvent;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.get_mut().subscriber.poll_unpin(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.subscriber.size_hint()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{StateBackendClient, Watch, WatchEvent};
+
+ use crate::state::backend::memory::MemoryBackendClient;
+ use crate::state::backend::{Keyspace, Operation};
+ use crate::state::with_locks;
+ use futures::StreamExt;
+ use std::result::Result;
+
+ #[tokio::test]
+ async fn put_read() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key";
+ let value = "value".as_bytes();
+ client
+ .put(Keyspace::Slots, key.to_owned(), value.to_vec())
+ .await?;
+ assert_eq!(client.get(Keyspace::Slots, key).await?, value);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn put_move() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key";
+ let value = "value".as_bytes();
+ client
+ .put(Keyspace::ActiveJobs, key.to_owned(), value.to_vec())
+ .await?;
+ client
+ .mv(Keyspace::ActiveJobs, Keyspace::FailedJobs, key)
+ .await?;
+ assert_eq!(client.get(Keyspace::FailedJobs, key).await?, value);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn multiple_operation() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key".to_string();
+ let value = "value".as_bytes().to_vec();
+ let locks = client
+ .acquire_locks(vec![(Keyspace::ActiveJobs, ""), (Keyspace::Slots, "")])
+ .await?;
+
+ let _r: ballista_core::error::Result<()> = with_locks(locks, async {
+ let txn_ops = vec![
+ (Operation::Put(value.clone()), Keyspace::Slots, key.clone()),
+ (
+ Operation::Put(value.clone()),
+ Keyspace::ActiveJobs,
+ key.clone(),
+ ),
+ ];
+ client.apply_txn(txn_ops).await?;
+ Ok(())
+ })
+ .await;
+
+ assert_eq!(client.get(Keyspace::Slots, key.as_str()).await?, value);
+ assert_eq!(client.get(Keyspace::ActiveJobs, key.as_str()).await?, value);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_empty() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key";
+ let empty: &[u8] = &[];
+ assert_eq!(client.get(Keyspace::Slots, key).await?, empty);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_prefix() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key";
+ let value = "value".as_bytes();
+ client
+ .put(Keyspace::Slots, format!("{}/1", key), value.to_vec())
+ .await?;
+ client
+ .put(Keyspace::Slots, format!("{}/2", key), value.to_vec())
+ .await?;
+ assert_eq!(
+ client.get_from_prefix(Keyspace::Slots, key).await?,
+ vec![
+ ("/Slots/key/1".to_owned(), value.to_vec()),
+ ("/Slots/key/2".to_owned(), value.to_vec())
+ ]
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_watch() -> Result<(), Box<dyn std::error::Error>> {
+ let client = MemoryBackendClient::new();
+ let key = "key";
+ let value = "value".as_bytes();
+ let mut watch_keyspace: Box<dyn Watch> =
+ client.watch(Keyspace::Slots, "".to_owned()).await?;
+ let mut watch_key: Box<dyn Watch> =
+ client.watch(Keyspace::Slots, key.to_owned()).await?;
+ client
+ .put(Keyspace::Slots, key.to_owned(), value.to_vec())
+ .await?;
+ assert_eq!(
+ watch_keyspace.next().await,
+ Some(WatchEvent::Put(
+ format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+ value.to_owned()
+ ))
+ );
+ assert_eq!(
+ watch_key.next().await,
+ Some(WatchEvent::Put(
+ format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+ value.to_owned()
+ ))
+ );
+ let value2 = "value2".as_bytes();
+ client
+ .put(Keyspace::Slots, key.to_owned(), value2.to_vec())
+ .await?;
+ assert_eq!(
+ watch_keyspace.next().await,
+ Some(WatchEvent::Put(
+ format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+ value2.to_owned()
+ ))
+ );
+ assert_eq!(
+ watch_key.next().await,
+ Some(WatchEvent::Put(
+ format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
+ value2.to_owned()
+ ))
+ );
+ watch_keyspace.cancel().await?;
+ watch_key.cancel().await?;
+ Ok(())
+ }
+}
diff --git a/ballista/scheduler/src/state/backend/mod.rs b/ballista/scheduler/src/state/backend/mod.rs
index cbba3ed1..0807f3c2 100644
--- a/ballista/scheduler/src/state/backend/mod.rs
+++ b/ballista/scheduler/src/state/backend/mod.rs
@@ -24,15 +24,18 @@ use tokio::sync::OwnedMutexGuard;
#[cfg(feature = "etcd")]
pub mod etcd;
+pub mod memory;
#[cfg(feature = "sled")]
-pub mod standalone;
+pub mod sled;
+mod utils;
// an enum used to configure the backend
// needs to be visible to code generated by configure_me
#[derive(Debug, Clone, ArgEnum, serde::Deserialize)]
pub enum StateBackend {
Etcd,
- Standalone,
+ Memory,
+ Sled,
}
impl std::str::FromStr for StateBackend {
@@ -134,7 +137,7 @@ pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
async fn cancel(&mut self) -> Result<()>;
}
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
pub enum WatchEvent {
/// Contains the inserted or updated key and the new value
Put(String, Vec<u8>),
diff --git a/ballista/scheduler/src/state/backend/standalone.rs b/ballista/scheduler/src/state/backend/sled.rs
similarity index 96%
rename from ballista/scheduler/src/state/backend/standalone.rs
rename to ballista/scheduler/src/state/backend/sled.rs
index 57bf7470..e42da4ca 100644
--- a/ballista/scheduler/src/state/backend/standalone.rs
+++ b/ballista/scheduler/src/state/backend/sled.rs
@@ -29,15 +29,15 @@ use crate::state::backend::{
Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
};
-/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster configuration.
+/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster state.
#[derive(Clone)]
-pub struct StandaloneClient {
+pub struct SledClient {
db: sled::Db,
locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
}
-impl StandaloneClient {
- /// Creates a StandaloneClient that saves data to the specified file.
+impl SledClient {
+ /// Creates a SledClient that saves data to the specified file.
pub fn try_new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
Ok(Self {
db: sled::open(path).map_err(sled_to_ballista_error)?,
@@ -45,7 +45,7 @@ impl StandaloneClient {
})
}
- /// Creates a StandaloneClient that saves data to a temp file.
+ /// Creates a SledClient that saves data to a temp file.
pub fn try_new_temporary() -> Result<Self> {
Ok(Self {
db: sled::Config::new()
@@ -65,7 +65,7 @@ fn sled_to_ballista_error(e: sled::Error) -> BallistaError {
}
#[tonic::async_trait]
-impl StateBackendClient for StandaloneClient {
+impl StateBackendClient for SledClient {
async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
let key = format!("/{:?}/{}", keyspace, key);
Ok(self
@@ -282,15 +282,15 @@ impl Stream for SledWatch {
#[cfg(test)]
mod tests {
- use super::{StandaloneClient, StateBackendClient, Watch, WatchEvent};
+ use super::{SledClient, StateBackendClient, Watch, WatchEvent};
use crate::state::backend::{Keyspace, Operation};
use crate::state::with_locks;
use futures::StreamExt;
use std::result::Result;
- fn create_instance() -> Result<StandaloneClient, Box<dyn std::error::Error>> {
- Ok(StandaloneClient::try_new_temporary()?)
+ fn create_instance() -> Result<SledClient, Box<dyn std::error::Error>> {
+ Ok(SledClient::try_new_temporary()?)
}
#[tokio::test]
diff --git a/ballista/scheduler/src/state/backend/utils/mod.rs b/ballista/scheduler/src/state/backend/utils/mod.rs
new file mode 100644
index 00000000..de95dd6e
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[allow(dead_code)]
+mod oneshot;
+#[allow(dead_code)]
+pub(crate) mod subscriber;
diff --git a/ballista/scheduler/src/state/backend/utils/oneshot.rs b/ballista/scheduler/src/state/backend/utils/oneshot.rs
new file mode 100644
index 00000000..a0d14699
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/oneshot.rs
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! It's mainly a modified version of sled::oneshot
+
+use std::{
+ future::Future,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll, Waker},
+ time::{Duration, Instant},
+};
+
+use parking_lot::{Condvar, Mutex};
+
+#[derive(Debug)]
+struct OneShotState<T> {
+ filled: bool,
+ fused: bool,
+ item: Option<T>,
+ waker: Option<Waker>,
+}
+
+impl<T> Default for OneShotState<T> {
+ fn default() -> OneShotState<T> {
+ OneShotState {
+ filled: false,
+ fused: false,
+ item: None,
+ waker: None,
+ }
+ }
+}
+
+/// A Future value which may or may not be filled
+#[derive(Debug)]
+pub struct OneShot<T> {
+ mu: Arc<Mutex<OneShotState<T>>>,
+ cv: Arc<Condvar>,
+}
+
+/// The completer side of the Future
+pub struct OneShotFiller<T> {
+ mu: Arc<Mutex<OneShotState<T>>>,
+ cv: Arc<Condvar>,
+}
+
+impl<T> OneShot<T> {
+ /// Create a new `OneShotFiller` and the `OneShot`
+ /// that will be filled by its completion.
+ pub fn pair() -> (OneShotFiller<T>, Self) {
+ let mu = Arc::new(Mutex::new(OneShotState::default()));
+ let cv = Arc::new(Condvar::new());
+ let future = Self {
+ mu: mu.clone(),
+ cv: cv.clone(),
+ };
+ let filler = OneShotFiller { mu, cv };
+
+ (filler, future)
+ }
+
+ /// Block on the `OneShot`'s completion
+ /// or dropping of the `OneShotFiller`
+ pub fn wait(self) -> Option<T> {
+ let mut inner = self.mu.lock();
+ while !inner.filled {
+ self.cv.wait(&mut inner);
+ }
+ inner.item.take()
+ }
+
+ /// Block on the `OneShot`'s completion
+ /// or dropping of the `OneShotFiller`,
+ /// returning an error if not filled
+ /// before a given timeout or if the
+ /// system shuts down before then.
+ ///
+ /// Upon a successful receive, the
+ /// oneshot should be dropped, as it
+ /// will never yield that value again.
+ pub fn wait_timeout(
+ &mut self,
+ mut timeout: Duration,
+ ) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
+ let mut inner = self.mu.lock();
+ while !inner.filled {
+ let start = Instant::now();
+ let res = self.cv.wait_for(&mut inner, timeout);
+ if res.timed_out() {
+ return Err(std::sync::mpsc::RecvTimeoutError::Disconnected);
+ }
+ timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+ timeout
+ } else {
+ Duration::from_nanos(0)
+ };
+ }
+ if let Some(item) = inner.item.take() {
+ Ok(item)
+ } else {
+ Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
+ }
+ }
+}
+
+impl<T> Future for OneShot<T> {
+ type Output = Option<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut state = self.mu.lock();
+ if state.fused {
+ return Poll::Pending;
+ }
+ if state.filled {
+ state.fused = true;
+ Poll::Ready(state.item.take())
+ } else {
+ state.waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+}
+
+impl<T> OneShotFiller<T> {
+ /// Complete the `OneShot`
+ pub fn fill(self, inner: T) {
+ let mut state = self.mu.lock();
+
+ if let Some(waker) = state.waker.take() {
+ waker.wake();
+ }
+
+ state.filled = true;
+ state.item = Some(inner);
+
+ // having held the mutex makes this linearized
+ // with the notify below.
+ drop(state);
+
+ let _notified = self.cv.notify_all();
+ }
+}
+
+impl<T> Drop for OneShotFiller<T> {
+ fn drop(&mut self) {
+ let mut state = self.mu.lock();
+
+ if state.filled {
+ return;
+ }
+
+ if let Some(waker) = state.waker.take() {
+ waker.wake();
+ }
+
+ state.filled = true;
+
+ // having held the mutex makes this linearized
+ // with the notify below.
+ drop(state);
+
+ let _notified = self.cv.notify_all();
+ }
+}
diff --git a/ballista/scheduler/src/state/backend/utils/subscriber.rs b/ballista/scheduler/src/state/backend/utils/subscriber.rs
new file mode 100644
index 00000000..dd74b664
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/utils/subscriber.rs
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! It's mainly a modified version of sled::subscriber
+
+use crate::state::backend::utils::oneshot::{OneShot, OneShotFiller};
+use crate::state::backend::WatchEvent;
+
+use parking_lot::RwLock;
+use std::collections::{BTreeMap, HashMap};
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::atomic::{AtomicBool, AtomicUsize};
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError};
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+use std::time::{Duration, Instant};
+
+static ID_GEN: AtomicUsize = AtomicUsize::new(0);
+
+type Senders = HashMap<usize, (Option<Waker>, SyncSender<OneShot<Option<WatchEvent>>>)>;
+
+/// Aynchronous, non-blocking subscriber:
+///
+/// `Subscription` implements `Future<Output=Option<Event>>`.
+///
+/// `while let Some(event) = (&mut subscriber).await { /* use it */ }`
+pub struct Subscriber {
+ id: usize,
+ rx: Receiver<OneShot<Option<WatchEvent>>>,
+ existing: Option<OneShot<Option<WatchEvent>>>,
+ home: Arc<RwLock<Senders>>,
+}
+
+impl Drop for Subscriber {
+ fn drop(&mut self) {
+ let mut w_senders = self.home.write();
+ w_senders.remove(&self.id);
+ }
+}
+
+impl Subscriber {
+ /// Attempts to wait for a value on this `Subscriber`, returning
+ /// an error if no event arrives within the provided `Duration`
+ /// or if the backing `Db` shuts down.
+ pub fn next_timeout(
+ &mut self,
+ mut timeout: Duration,
+ ) -> std::result::Result<WatchEvent, std::sync::mpsc::RecvTimeoutError> {
+ loop {
+ let start = Instant::now();
+ let mut future_rx = if let Some(future_rx) = self.existing.take() {
+ future_rx
+ } else {
+ self.rx.recv_timeout(timeout)?
+ };
+ timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+ timeout
+ } else {
+ Duration::from_nanos(0)
+ };
+
+ let start = Instant::now();
+ match future_rx.wait_timeout(timeout) {
+ Ok(Some(event)) => return Ok(event),
+ Ok(None) => (),
+ Err(timeout_error) => {
+ self.existing = Some(future_rx);
+ return Err(timeout_error);
+ }
+ }
+ timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+ timeout
+ } else {
+ Duration::from_nanos(0)
+ };
+ }
+ }
+}
+
+impl Future for Subscriber {
+ type Output = Option<WatchEvent>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ let mut future_rx = if let Some(future_rx) = self.existing.take() {
+ future_rx
+ } else {
+ match self.rx.try_recv() {
+ Ok(future_rx) => future_rx,
+ Err(TryRecvError::Empty) => break,
+ Err(TryRecvError::Disconnected) => return Poll::Ready(None),
+ }
+ };
+
+ match Future::poll(Pin::new(&mut future_rx), cx) {
+ Poll::Ready(Some(event)) => return Poll::Ready(event),
+ Poll::Ready(None) => continue,
+ Poll::Pending => {
+ self.existing = Some(future_rx);
+ return Poll::Pending;
+ }
+ }
+ }
+ let mut home = self.home.write();
+ let entry = home.get_mut(&self.id).unwrap();
+ entry.0 = Some(cx.waker().clone());
+ Poll::Pending
+ }
+}
+
+impl Iterator for Subscriber {
+ type Item = WatchEvent;
+
+ fn next(&mut self) -> Option<WatchEvent> {
+ loop {
+ let future_rx = self.rx.recv().ok()?;
+ match future_rx.wait() {
+ Some(Some(event)) => return Some(event),
+ Some(None) => return None,
+ None => continue,
+ }
+ }
+ }
+}
+
+#[derive(Debug, Default)]
+pub(crate) struct Subscribers {
+ watched: RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Senders>>>>,
+ ever_used: AtomicBool,
+}
+
+impl Drop for Subscribers {
+ fn drop(&mut self) {
+ let watched = self.watched.read();
+
+ for senders in watched.values() {
+ let senders = std::mem::take(&mut *senders.write());
+ for (_, (waker, sender)) in senders {
+ drop(sender);
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
+ }
+ }
+}
+
+impl Subscribers {
+ pub(crate) fn register(&self, prefix: &[u8]) -> Subscriber {
+ self.ever_used.store(true, Relaxed);
+ let r_mu = {
+ let r_mu = self.watched.read();
+ if r_mu.contains_key(prefix) {
+ r_mu
+ } else {
+ drop(r_mu);
+ let mut w_mu = self.watched.write();
+ if !w_mu.contains_key(prefix) {
+ let old = w_mu.insert(
+ prefix.to_vec(),
+ Arc::new(RwLock::new(HashMap::default())),
+ );
+ assert!(old.is_none());
+ }
+ drop(w_mu);
+ self.watched.read()
+ }
+ };
+
+ let (tx, rx) = sync_channel(1024);
+
+ let arc_senders = &r_mu[prefix];
+ let mut w_senders = arc_senders.write();
+
+ let id = ID_GEN.fetch_add(1, Relaxed);
+
+ w_senders.insert(id, (None, tx));
+
+ Subscriber {
+ id,
+ rx,
+ existing: None,
+ home: arc_senders.clone(),
+ }
+ }
+
+ pub(crate) fn reserve<R: AsRef<[u8]>>(&self, key: R) -> Option<ReservedBroadcast> {
+ if !self.ever_used.load(Relaxed) {
+ return None;
+ }
+
+ let r_mu = self.watched.read();
+ let prefixes = r_mu.iter().filter(|(k, _)| key.as_ref().starts_with(k));
+
+ let mut subscribers = vec![];
+
+ for (_, subs_rwl) in prefixes {
+ let subs = subs_rwl.read();
+
+ for (_id, (waker, sender)) in subs.iter() {
+ let (tx, rx) = OneShot::pair();
+ if sender.send(rx).is_err() {
+ continue;
+ }
+ subscribers.push((waker.clone(), tx));
+ }
+ }
+
+ if subscribers.is_empty() {
+ None
+ } else {
+ Some(ReservedBroadcast { subscribers })
+ }
+ }
+}
+
+pub(crate) struct ReservedBroadcast {
+ subscribers: Vec<(Option<Waker>, OneShotFiller<Option<WatchEvent>>)>,
+}
+
+impl ReservedBroadcast {
+ pub fn complete(self, event: &WatchEvent) {
+ let iter = self.subscribers.into_iter();
+
+ for (waker, tx) in iter {
+ tx.fill(Some(event.clone()));
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
+ }
+}
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index d86674f1..165502b2 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -910,7 +910,7 @@ impl ExecutorHeartbeatListener {
#[cfg(test)]
mod test {
use crate::config::SlotsPolicy;
- use crate::state::backend::standalone::StandaloneClient;
+ use crate::state::backend::sled::SledClient;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use ballista_core::error::Result;
use ballista_core::serde::scheduler::{
@@ -928,7 +928,7 @@ mod test {
}
async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
@@ -966,7 +966,7 @@ mod test {
}
async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
@@ -1021,7 +1021,7 @@ mod test {
let executors = test_executors(10, 4);
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
@@ -1066,7 +1066,7 @@ mod test {
}
async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3c12d2c8..3c580280 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -428,7 +428,7 @@ pub async fn with_locks<Out, F: Future<Output = Out>>(
#[cfg(test)]
mod test {
- use crate::state::backend::standalone::StandaloneClient;
+ use crate::state::backend::sled::SledClient;
use crate::state::SchedulerState;
use ballista_core::config::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
use ballista_core::error::Result;
@@ -454,7 +454,7 @@ mod test {
// We should free any reservations which are not assigned
#[tokio::test]
async fn test_offer_free_reservations() -> Result<()> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::new_with_default_scheduler_name(
state_storage,
@@ -490,7 +490,7 @@ mod test {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::with_task_launcher(
state_storage,
@@ -575,7 +575,7 @@ mod test {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::with_task_launcher(
state_storage,
diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs
index 2566dcb5..b4526fd1 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -27,7 +27,7 @@ use async_trait::async_trait;
use crate::config::SchedulerConfig;
use crate::metrics::SchedulerMetricsCollector;
use crate::scheduler_server::{timestamp_millis, SchedulerServer};
-use crate::state::backend::standalone::StandaloneClient;
+use crate::state::backend::sled::SledClient;
use crate::state::executor_manager::ExecutorManager;
use crate::state::task_manager::TaskLauncher;
@@ -380,7 +380,7 @@ impl SchedulerTest {
task_slots_per_executor: usize,
runner: Option<Arc<dyn TaskRunner>>,
) -> Result<Self> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state_storage = Arc::new(SledClient::try_new_temporary()?);
let ballista_config = BallistaConfig::builder()
.set(