You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2023/07/13 01:26:41 UTC
[arrow-ballista] branch main updated: Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 5dfdfeaf Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
5dfdfeaf is described below
commit 5dfdfeaf57f953b62992ac3c1ae592dfb655f631
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 13 09:26:37 2023 +0800
Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
* Remove ExecutorReservation and change the task assignment philosophy from executor first to task first
* Add PENDING_JOBS_METRIC and RUNNING_JOBS_METRIC to replace INFLIGHT_TASKS_METRIC_NAME
---------
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/scheduler/src/cluster/kv.rs | 271 +++------
ballista/scheduler/src/cluster/memory.rs | 216 ++------
ballista/scheduler/src/cluster/mod.rs | 474 +++++++++++++---
ballista/scheduler/src/cluster/storage/mod.rs | 11 -
ballista/scheduler/src/cluster/test/mod.rs | 587 --------------------
ballista/scheduler/src/cluster/test_util/mod.rs | 214 ++++++++
ballista/scheduler/src/scheduler_server/event.rs | 18 +-
.../src/scheduler_server/external_scaler.rs | 21 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 114 ++--
ballista/scheduler/src/scheduler_server/mod.rs | 77 +--
.../src/scheduler_server/query_stage_scheduler.rs | 436 +++++----------
ballista/scheduler/src/state/execution_graph.rs | 79 ++-
ballista/scheduler/src/state/executor_manager.rs | 568 +++++--------------
ballista/scheduler/src/state/mod.rs | 608 ++++++---------------
ballista/scheduler/src/state/task_manager.rs | 129 ++---
ballista/scheduler/src/test_utils.rs | 26 +-
16 files changed, 1412 insertions(+), 2437 deletions(-)
diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs
index effbc0cc..a8852fb6 100644
--- a/ballista/scheduler/src/cluster/kv.rs
+++ b/ballista/scheduler/src/cluster/kv.rs
@@ -17,13 +17,14 @@
use crate::cluster::storage::{KeyValueStore, Keyspace, Lock, Operation, WatchEvent};
use crate::cluster::{
- reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream,
- JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
+ bind_task_bias, bind_task_round_robin, BoundTask, ClusterState,
+ ExecutorHeartbeatStream, ExecutorSlot, JobState, JobStateEvent, JobStateEventStream,
+ JobStatus, TaskDistribution,
};
use crate::scheduler_server::{timestamp_secs, SessionBuilder};
use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
use crate::state::session_manager::create_datafusion_context;
+use crate::state::task_manager::JobInfoCache;
use crate::state::{decode_into, decode_protobuf};
use async_trait::async_trait;
use ballista_core::config::BallistaConfig;
@@ -174,12 +175,12 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
Ok(())
}
- async fn reserve_slots(
+ async fn bind_schedulable_tasks(
&self,
- num_slots: u32,
distribution: TaskDistribution,
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>> {
+ ) -> Result<Vec<BoundTask>> {
let lock = self.store.lock(Keyspace::Slots, "global").await?;
with_lock(lock, async {
@@ -192,7 +193,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
))
})?;
- let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
+ let available_slots: Vec<&mut AvailableTaskSlots> = slots
.task_slots
.iter_mut()
.filter_map(|data| {
@@ -205,82 +206,33 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
})
.collect();
- available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
- let reservations = match distribution {
- TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
+ let bound_tasks = match distribution {
+ TaskDistribution::Bias => {
+ bind_task_bias(available_slots, active_jobs, |_| false).await
+ }
TaskDistribution::RoundRobin => {
- reserve_slots_round_robin(available_slots, num_slots)
+ bind_task_round_robin(available_slots, active_jobs, |_| false).await
}
};
- if !reservations.is_empty() {
+ if !bound_tasks.is_empty() {
self.store
.put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
.await?
}
- Ok(reservations)
+ Ok(bound_tasks)
})
.await
}
- async fn reserve_slots_exact(
- &self,
- num_slots: u32,
- distribution: TaskDistribution,
- executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>> {
- let lock = self.store.lock(Keyspace::Slots, "global").await?;
-
- with_lock(lock, async {
- let resources = self.store.get(Keyspace::Slots, "all").await?;
-
- let mut slots =
- ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
- BallistaError::Internal(format!(
- "Unexpected value in executor slots state: {err:?}"
- ))
- })?;
-
- let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
- .task_slots
- .iter_mut()
- .filter_map(|data| {
- (data.slots > 0
- && executors
- .as_ref()
- .map(|executors| executors.contains(&data.executor_id))
- .unwrap_or(true))
- .then_some(data)
- })
- .collect();
-
- available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
- let reservations = match distribution {
- TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
- TaskDistribution::RoundRobin => {
- reserve_slots_round_robin(available_slots, num_slots)
- }
- };
-
- if reservations.len() == num_slots as usize {
- self.store
- .put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
- .await?;
- Ok(reservations)
- } else {
- Ok(vec![])
- }
- })
- .await
- }
+ async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
+ let mut increments = HashMap::new();
+ for (executor_id, num_slots) in executor_slots {
+ let v = increments.entry(executor_id).or_insert_with(|| 0);
+ *v += num_slots;
+ }
- async fn cancel_reservations(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
let lock = self.store.lock(Keyspace::Slots, "all").await?;
with_lock(lock, async {
@@ -293,18 +245,9 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
))
})?;
- let mut increments = HashMap::new();
- for ExecutorReservation { executor_id, .. } in reservations {
- if let Some(inc) = increments.get_mut(&executor_id) {
- *inc += 1;
- } else {
- increments.insert(executor_id, 1usize);
- }
- }
-
for executor_slots in slots.task_slots.iter_mut() {
if let Some(slots) = increments.get(&executor_slots.executor_id) {
- executor_slots.slots += *slots as u32;
+ executor_slots.slots += *slots;
}
}
@@ -319,8 +262,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
&self,
metadata: ExecutorMetadata,
spec: ExecutorData,
- reserve: bool,
- ) -> Result<Vec<ExecutorReservation>> {
+ ) -> Result<()> {
let executor_id = metadata.id.clone();
//TODO this should be in a transaction
@@ -338,83 +280,40 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
})
.await?;
- if !reserve {
- let available_slots = AvailableTaskSlots {
- executor_id,
- slots: spec.available_task_slots,
- };
-
- let lock = self.store.lock(Keyspace::Slots, "all").await?;
+ let available_slots = AvailableTaskSlots {
+ executor_id,
+ slots: spec.available_task_slots,
+ };
- with_lock(lock, async {
- let current_slots = self.store.get(Keyspace::Slots, "all").await?;
+ let lock = self.store.lock(Keyspace::Slots, "all").await?;
- let mut current_slots: ExecutorTaskSlots =
- decode_protobuf(current_slots.as_slice())?;
+ with_lock(lock, async {
+ let current_slots = self.store.get(Keyspace::Slots, "all").await?;
- if let Some((idx, _)) =
- current_slots.task_slots.iter().find_position(|slots| {
- slots.executor_id == available_slots.executor_id
- })
- {
- current_slots.task_slots[idx] = available_slots;
- } else {
- current_slots.task_slots.push(available_slots);
- }
+ let mut current_slots: ExecutorTaskSlots =
+ decode_protobuf(current_slots.as_slice())?;
- self.store
- .put(
- Keyspace::Slots,
- "all".to_string(),
- current_slots.encode_to_vec(),
- )
- .await
- })
- .await?;
-
- Ok(vec![])
- } else {
- let num_slots = spec.available_task_slots as usize;
- let mut reservations: Vec<ExecutorReservation> = vec![];
- for _ in 0..num_slots {
- reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+ if let Some((idx, _)) = current_slots
+ .task_slots
+ .iter()
+ .find_position(|slots| slots.executor_id == available_slots.executor_id)
+ {
+ current_slots.task_slots[idx] = available_slots;
+ } else {
+ current_slots.task_slots.push(available_slots);
}
- let available_slots = AvailableTaskSlots {
- executor_id,
- slots: 0,
- };
-
- let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
- with_lock(lock, async {
- let current_slots = self.store.get(Keyspace::Slots, "all").await?;
-
- let mut current_slots: ExecutorTaskSlots =
- decode_protobuf(current_slots.as_slice())?;
-
- if let Some((idx, _)) =
- current_slots.task_slots.iter().find_position(|slots| {
- slots.executor_id == available_slots.executor_id
- })
- {
- current_slots.task_slots[idx] = available_slots;
- } else {
- current_slots.task_slots.push(available_slots);
- }
-
- self.store
- .put(
- Keyspace::Slots,
- "all".to_string(),
- current_slots.encode_to_vec(),
- )
- .await
- })
- .await?;
+ self.store
+ .put(
+ Keyspace::Slots,
+ "all".to_string(),
+ current_slots.encode_to_vec(),
+ )
+ .await
+ })
+ .await?;
- Ok(reservations)
- }
+ Ok(())
}
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
@@ -502,18 +401,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> JobState
for KeyValueState<S, T, U>
{
- async fn accept_job(
- &self,
- job_id: &str,
- job_name: &str,
- queued_at: u64,
- ) -> Result<()> {
+ fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
self.queued_jobs
.insert(job_id.to_string(), (job_name.to_string(), queued_at));
Ok(())
}
+ fn pending_job_number(&self) -> usize {
+ self.queued_jobs.len()
+ }
+
async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()> {
if self.queued_jobs.get(&job_id).is_some() {
let status = graph.status();
@@ -774,13 +672,10 @@ async fn with_lock<Out, F: Future<Output = Out>>(mut lock: Box<dyn Lock>, op: F)
#[cfg(test)]
mod test {
+
use crate::cluster::kv::KeyValueState;
use crate::cluster::storage::sled::SledClient;
- use crate::cluster::test::{
- test_executor_registration, test_fuzz_reservations, test_job_lifecycle,
- test_job_planning_failure, test_reservation,
- };
- use crate::cluster::TaskDistribution;
+ use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
use crate::test_utils::{
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
};
@@ -788,48 +683,6 @@ mod test {
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
- #[cfg(feature = "sled")]
- fn make_sled_state() -> Result<KeyValueState<SledClient>> {
- Ok(KeyValueState::new(
- "",
- SledClient::try_new_temporary()?,
- BallistaCodec::default(),
- default_session_builder,
- ))
- }
-
- #[cfg(feature = "sled")]
- #[tokio::test]
- async fn test_sled_executor_reservation() -> Result<()> {
- test_executor_registration(make_sled_state()?).await
- }
-
- #[cfg(feature = "sled")]
- #[tokio::test]
- async fn test_sled_reserve() -> Result<()> {
- test_reservation(make_sled_state()?, TaskDistribution::Bias).await?;
- test_reservation(make_sled_state()?, TaskDistribution::RoundRobin).await?;
-
- Ok(())
- }
-
- #[cfg(feature = "sled")]
- #[tokio::test]
- async fn test_sled_fuzz_reserve() -> Result<()> {
- test_fuzz_reservations(make_sled_state()?, 10, TaskDistribution::Bias, 10, 10)
- .await?;
- test_fuzz_reservations(
- make_sled_state()?,
- 10,
- TaskDistribution::RoundRobin,
- 10,
- 10,
- )
- .await?;
-
- Ok(())
- }
-
#[cfg(feature = "sled")]
#[tokio::test]
async fn test_sled_job_lifecycle() -> Result<()> {
@@ -854,4 +707,14 @@ mod test {
Ok(())
}
+
+ #[cfg(feature = "sled")]
+ fn make_sled_state() -> Result<KeyValueState<SledClient>> {
+ Ok(KeyValueState::new(
+ "",
+ SledClient::try_new_temporary()?,
+ BallistaCodec::default(),
+ default_session_builder,
+ ))
+ }
}
diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs
index 9a8a6fc9..6060e79c 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,17 +16,16 @@
// under the License.
use crate::cluster::{
- reserve_slots_bias, reserve_slots_round_robin, ClusterState, JobState, JobStateEvent,
- JobStateEventStream, JobStatus, TaskDistribution,
+ bind_task_bias, bind_task_round_robin, BoundTask, ClusterState, ExecutorSlot,
+ JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
};
use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
use async_trait::async_trait;
use ballista_core::config::BallistaConfig;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::{
- executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus,
- ExecutorTaskSlots, FailedJob, QueuedJob,
+ executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus, FailedJob,
+ QueuedJob,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use dashmap::DashMap;
@@ -35,20 +34,20 @@ use datafusion::prelude::SessionContext;
use crate::cluster::event::ClusterEventSender;
use crate::scheduler_server::{timestamp_millis, timestamp_secs, SessionBuilder};
use crate::state::session_manager::create_datafusion_context;
+use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::job_status::Status;
-use itertools::Itertools;
use log::warn;
-use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
use std::sync::Arc;
+use tokio::sync::Mutex;
use tracing::debug;
#[derive(Default)]
pub struct InMemoryClusterState {
/// Current available task slots for each executor
- task_slots: Mutex<ExecutorTaskSlots>,
+ task_slots: Mutex<HashMap<String, AvailableTaskSlots>>,
/// Current executors
executors: DashMap<String, ExecutorMetadata>,
/// Last heartbeat received for each executor
@@ -57,17 +56,16 @@ pub struct InMemoryClusterState {
#[async_trait]
impl ClusterState for InMemoryClusterState {
- async fn reserve_slots(
+ async fn bind_schedulable_tasks(
&self,
- num_slots: u32,
distribution: TaskDistribution,
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>> {
- let mut guard = self.task_slots.lock();
+ ) -> Result<Vec<BoundTask>> {
+ let mut guard = self.task_slots.lock().await;
- let mut available_slots: Vec<&mut AvailableTaskSlots> = guard
- .task_slots
- .iter_mut()
+ let available_slots: Vec<&mut AvailableTaskSlots> = guard
+ .values_mut()
.filter_map(|data| {
(data.slots > 0
&& executors
@@ -78,76 +76,30 @@ impl ClusterState for InMemoryClusterState {
})
.collect();
- available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
- let reservations = match distribution {
- TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
- TaskDistribution::RoundRobin => {
- reserve_slots_round_robin(available_slots, num_slots)
+ let schedulable_tasks = match distribution {
+ TaskDistribution::Bias => {
+ bind_task_bias(available_slots, active_jobs, |_| false).await
}
- };
-
- Ok(reservations)
- }
-
- async fn reserve_slots_exact(
- &self,
- num_slots: u32,
- distribution: TaskDistribution,
- executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>> {
- let mut guard = self.task_slots.lock();
-
- let rollback = guard.clone();
-
- let mut available_slots: Vec<&mut AvailableTaskSlots> = guard
- .task_slots
- .iter_mut()
- .filter_map(|data| {
- (data.slots > 0
- && executors
- .as_ref()
- .map(|executors| executors.contains(&data.executor_id))
- .unwrap_or(true))
- .then_some(data)
- })
- .collect();
-
- available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
-
- let reservations = match distribution {
- TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
TaskDistribution::RoundRobin => {
- reserve_slots_round_robin(available_slots, num_slots)
+ bind_task_round_robin(available_slots, active_jobs, |_| false).await
}
};
- if reservations.len() as u32 != num_slots {
- *guard = rollback;
- Ok(vec![])
- } else {
- Ok(reservations)
- }
+ Ok(schedulable_tasks)
}
- async fn cancel_reservations(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
+ async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
let mut increments = HashMap::new();
- for ExecutorReservation { executor_id, .. } in reservations {
- if let Some(inc) = increments.get_mut(&executor_id) {
- *inc += 1;
- } else {
- increments.insert(executor_id, 1usize);
- }
+ for (executor_id, num_slots) in executor_slots {
+ let v = increments.entry(executor_id).or_insert_with(|| 0);
+ *v += num_slots;
}
- let mut guard = self.task_slots.lock();
+ let mut guard = self.task_slots.lock().await;
- for executor_slots in guard.task_slots.iter_mut() {
- if let Some(slots) = increments.get(&executor_slots.executor_id) {
- executor_slots.slots += *slots as u32;
+ for (executor_id, num_slots) in increments {
+ if let Some(mut data) = guard.get_mut(&executor_id) {
+ data.slots += num_slots;
}
}
@@ -157,9 +109,8 @@ impl ClusterState for InMemoryClusterState {
async fn register_executor(
&self,
metadata: ExecutorMetadata,
- mut spec: ExecutorData,
- reserve: bool,
- ) -> Result<Vec<ExecutorReservation>> {
+ spec: ExecutorData,
+ ) -> Result<()> {
let executor_id = metadata.id.clone();
self.save_executor_metadata(metadata).await?;
@@ -173,37 +124,17 @@ impl ClusterState for InMemoryClusterState {
})
.await?;
- let mut guard = self.task_slots.lock();
-
- // Check to see if we already have task slots for executor. If so, remove them.
- if let Some((idx, _)) = guard
- .task_slots
- .iter()
- .find_position(|slots| slots.executor_id == executor_id)
- {
- guard.task_slots.swap_remove(idx);
- }
-
- if reserve {
- let slots = std::mem::take(&mut spec.available_task_slots) as usize;
- let reservations = (0..slots)
- .map(|_| ExecutorReservation::new_free(executor_id.clone()))
- .collect();
+ let mut guard = self.task_slots.lock().await;
- guard.task_slots.push(AvailableTaskSlots {
- executor_id,
- slots: 0,
- });
-
- Ok(reservations)
- } else {
- guard.task_slots.push(AvailableTaskSlots {
+ guard.insert(
+ executor_id.clone(),
+ AvailableTaskSlots {
executor_id,
slots: spec.available_task_slots,
- });
+ },
+ );
- Ok(vec![])
- }
+ Ok(())
}
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
@@ -235,15 +166,9 @@ impl ClusterState for InMemoryClusterState {
async fn remove_executor(&self, executor_id: &str) -> Result<()> {
{
- let mut guard = self.task_slots.lock();
-
- if let Some((idx, _)) = guard
- .task_slots
- .iter()
- .find_position(|slots| slots.executor_id == executor_id)
- {
- guard.task_slots.swap_remove(idx);
- }
+ let mut guard = self.task_slots.lock().await;
+
+ guard.remove(executor_id);
}
self.heartbeats.remove(executor_id);
@@ -299,7 +224,8 @@ impl InMemoryJobState {
impl JobState for InMemoryJobState {
async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()> {
if self.queued_jobs.get(&job_id).is_some() {
- self.running_jobs.insert(job_id.clone(), graph.status());
+ self.running_jobs
+ .insert(job_id.clone(), graph.status().clone());
self.queued_jobs.remove(&job_id);
self.job_event_sender.send(&JobStateEvent::JobAcquired {
@@ -352,7 +278,7 @@ impl JobState for InMemoryJobState {
}
async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> Result<()> {
- let status = graph.status();
+ let status = graph.status().clone();
debug!("saving state for job {job_id} with status {:?}", status);
@@ -365,7 +291,7 @@ impl JobState for InMemoryJobState {
.insert(job_id.to_string(), (status, Some(graph.clone())));
self.running_jobs.remove(job_id);
} else if let Some(old_status) =
- self.running_jobs.insert(job_id.to_string(), graph.status())
+ self.running_jobs.insert(job_id.to_string(), status)
{
self.job_event_sender.send(&JobStateEvent::JobUpdated {
job_id: job_id.to_string(),
@@ -433,18 +359,17 @@ impl JobState for InMemoryJobState {
.collect())
}
- async fn accept_job(
- &self,
- job_id: &str,
- job_name: &str,
- queued_at: u64,
- ) -> Result<()> {
+ fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
self.queued_jobs
.insert(job_id.to_string(), (job_name.to_string(), queued_at));
Ok(())
}
+ fn pending_job_number(&self) -> usize {
+ self.queued_jobs.len()
+ }
+
async fn fail_unscheduled_job(&self, job_id: &str, reason: String) -> Result<()> {
if let Some((job_id, (job_name, queued_at))) = self.queued_jobs.remove(job_id) {
self.completed_jobs.insert(
@@ -475,57 +400,14 @@ impl JobState for InMemoryJobState {
#[cfg(test)]
mod test {
- use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
- use crate::cluster::test::{
- test_executor_registration, test_fuzz_reservations, test_job_lifecycle,
- test_job_planning_failure, test_reservation,
- };
- use crate::cluster::TaskDistribution;
+ use crate::cluster::memory::InMemoryJobState;
+ use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
use crate::test_utils::{
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
};
use ballista_core::error::Result;
use ballista_core::utils::default_session_builder;
- #[tokio::test]
- async fn test_in_memory_registration() -> Result<()> {
- test_executor_registration(InMemoryClusterState::default()).await
- }
-
- #[tokio::test]
- async fn test_in_memory_reserve() -> Result<()> {
- test_reservation(InMemoryClusterState::default(), TaskDistribution::Bias).await?;
- test_reservation(
- InMemoryClusterState::default(),
- TaskDistribution::RoundRobin,
- )
- .await?;
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_in_memory_fuzz_reserve() -> Result<()> {
- test_fuzz_reservations(
- InMemoryClusterState::default(),
- 10,
- TaskDistribution::Bias,
- 10,
- 10,
- )
- .await?;
- test_fuzz_reservations(
- InMemoryClusterState::default(),
- 10,
- TaskDistribution::RoundRobin,
- 10,
- 10,
- )
- .await?;
-
- Ok(())
- }
-
#[tokio::test]
async fn test_in_memory_job_lifecycle() -> Result<()> {
test_job_lifecycle(
diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs
index 0d8ae900..87680ac0 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -22,7 +22,7 @@ pub mod storage;
#[cfg(test)]
#[allow(clippy::uninlined_format_args)]
-pub mod test;
+pub mod test_util;
use crate::cluster::kv::KeyValueState;
use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
@@ -31,20 +31,23 @@ use crate::cluster::storage::sled::SledClient;
use crate::cluster::storage::KeyValueStore;
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
+use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
+use crate::state::task_manager::JobInfoCache;
use ballista_core::config::BallistaConfig;
use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::{AvailableTaskSlots, ExecutorHeartbeat, JobStatus};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use ballista_core::serde::protobuf::{
+ job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, PartitionId};
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
use clap::ArgEnum;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use futures::Stream;
-use log::info;
+use log::{debug, info, warn};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::pin::Pin;
@@ -195,6 +198,13 @@ impl BallistaCluster {
/// by any schedulers with a shared `ClusterState`
pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = ExecutorHeartbeat> + Send>>;
+/// A task bound with an executor to execute.
+/// BoundTask.0 is the executor id; While BoundTask.1 is the task description.
+pub type BoundTask = (String, TaskDescription);
+
+/// ExecutorSlot.0 is the executor id; While ExecutorSlot.1 is for slot number.
+pub type ExecutorSlot = (String, u32);
+
/// A trait that contains the necessary method to maintain a globally consistent view of cluster resources
#[tonic::async_trait]
pub trait ClusterState: Send + Sync + 'static {
@@ -203,45 +213,28 @@ pub trait ClusterState: Send + Sync + 'static {
Ok(())
}
- /// Reserve up to `num_slots` executor task slots. If not enough task slots are available, reserve
- /// as many as possible.
+ /// Bind the ready to running tasks from [`active_jobs`] with available executors.
///
- /// If `executors` is provided, only reserve slots of the specified executor IDs
- async fn reserve_slots(
+ /// If `executors` is provided, only bind slots from the specified executor IDs
+ async fn bind_schedulable_tasks(
&self,
- num_slots: u32,
distribution: TaskDistribution,
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>>;
+ ) -> Result<Vec<BoundTask>>;
- /// Reserve exactly `num_slots` executor task slots. If not enough task slots are available,
- /// returns an empty vec
+ /// Unbind executor and task when a task finishes or fails. It will increase the executor
+ /// available task slots.
///
- /// If `executors` is provided, only reserve slots of the specified executor IDs
- async fn reserve_slots_exact(
- &self,
- num_slots: u32,
- distribution: TaskDistribution,
- executors: Option<HashSet<String>>,
- ) -> Result<Vec<ExecutorReservation>>;
-
- /// Cancel the specified reservations. This will make reserved executor slots available to other
- /// tasks.
/// This operations should be atomic. Either all reservations are cancelled or none are
- async fn cancel_reservations(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()>;
+ async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()>;
- /// Register a new executor in the cluster. If `reserve` is true, then the executors task slots
- /// will be reserved and returned in the response and none of the new executors task slots will be
- /// available to other tasks.
+ /// Register a new executor in the cluster.
async fn register_executor(
&self,
metadata: ExecutorMetadata,
spec: ExecutorData,
- reserve: bool,
- ) -> Result<Vec<ExecutorReservation>>;
+ ) -> Result<()>;
/// Save the executor metadata. This will overwrite existing metadata for the executor ID
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()>;
@@ -309,12 +302,11 @@ pub trait JobState: Send + Sync {
/// Accept job into a scheduler's job queue. This should be called when a job is
/// received by the scheduler but before it is planned and may or may not be saved
/// in global state
- async fn accept_job(
- &self,
- job_id: &str,
- job_name: &str,
- queued_at: u64,
- ) -> Result<()>;
+ fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()>;
+
+ /// Get the number of queued jobs. If it's big, then it means the scheduler is too busy.
+ /// In normal case, it's better to be 0.
+ fn pending_job_number(&self) -> usize;
/// Submit a new job to the `JobState`. It is assumed that the submitter owns the job.
/// In local state the job should be save as `JobStatus::Active` and in shared state
@@ -376,66 +368,382 @@ pub trait JobState: Send + Sync {
) -> Result<Option<Arc<SessionContext>>>;
}
-pub(crate) fn reserve_slots_bias(
+pub(crate) async fn bind_task_bias(
mut slots: Vec<&mut AvailableTaskSlots>,
- mut n: u32,
-) -> Vec<ExecutorReservation> {
- let mut reservations = Vec::with_capacity(n as usize);
-
- let mut iter = slots.iter_mut();
-
- while n > 0 {
- if let Some(executor) = iter.next() {
- let take = executor.slots.min(n);
- for _ in 0..take {
- reservations
- .push(ExecutorReservation::new_free(executor.executor_id.clone()));
- }
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
+) -> Vec<BoundTask> {
+ let mut schedulable_tasks: Vec<BoundTask> = vec![];
+
+ let total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
+ if total_slots == 0 {
+ warn!("Not enough available executor slots for task running!!!");
+ return schedulable_tasks;
+ }
- executor.slots -= take;
- n -= take;
- } else {
- break;
+ // Sort the slots by descending order
+ slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
+
+ let mut idx_slot = 0usize;
+ let mut slot = &mut slots[idx_slot];
+ for (job_id, job_info) in active_jobs.iter() {
+ if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
+ debug!(
+ "Job {} is not in running status and will be skipped",
+ job_id
+ );
+ continue;
+ }
+ let mut graph = job_info.execution_graph.write().await;
+ let session_id = graph.session_id().to_string();
+ let mut black_list = vec![];
+ while let Some((running_stage, task_id_gen)) =
+ graph.fetch_running_stage(&black_list)
+ {
+ if if_skip(running_stage.plan.clone()) {
+ info!(
+ "Will skip stage {}/{} for bias task binding",
+ job_id, running_stage.stage_id
+ );
+ black_list.push(running_stage.stage_id);
+ continue;
+ }
+ // We are sure that it will at least bind one task by going through the following logic.
+ // It will not go into a dead loop.
+ let runnable_tasks = running_stage
+ .task_infos
+ .iter_mut()
+ .enumerate()
+ .filter(|(_partition, info)| info.is_none())
+ .take(total_slots as usize)
+ .collect::<Vec<_>>();
+ for (partition_id, task_info) in runnable_tasks {
+ // Assign [`slot`] with a slot available slot number larger than 0
+ while slot.slots == 0 {
+ idx_slot += 1;
+ if idx_slot >= slots.len() {
+ return schedulable_tasks;
+ }
+ slot = &mut slots[idx_slot];
+ }
+ let executor_id = slot.executor_id.clone();
+ let task_id = *task_id_gen;
+ *task_id_gen += 1;
+ *task_info = Some(create_task_info(executor_id.clone(), task_id));
+
+ let partition = PartitionId {
+ job_id: job_id.clone(),
+ stage_id: running_stage.stage_id,
+ partition_id,
+ };
+ let task_desc = TaskDescription {
+ session_id: session_id.clone(),
+ partition,
+ stage_attempt_num: running_stage.stage_attempt_num,
+ task_id,
+ task_attempt: running_stage.task_failure_numbers[partition_id],
+ plan: running_stage.plan.clone(),
+ };
+ schedulable_tasks.push((executor_id, task_desc));
+
+ slot.slots -= 1;
+ }
}
}
- reservations
+ schedulable_tasks
}
-pub(crate) fn reserve_slots_round_robin(
+pub(crate) async fn bind_task_round_robin(
mut slots: Vec<&mut AvailableTaskSlots>,
- mut n: u32,
-) -> Vec<ExecutorReservation> {
- let mut reservations = Vec::with_capacity(n as usize);
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
+) -> Vec<BoundTask> {
+ let mut schedulable_tasks: Vec<BoundTask> = vec![];
+
+ let mut total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
+ if total_slots == 0 {
+ warn!("Not enough available executor slots for task running!!!");
+ return schedulable_tasks;
+ }
+ info!("Total slot number is {}", total_slots);
+
+ // Sort the slots by descending order
+ slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
+
+ let mut idx_slot = 0usize;
+ for (job_id, job_info) in active_jobs.iter() {
+ if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
+ debug!(
+ "Job {} is not in running status and will be skipped",
+ job_id
+ );
+ continue;
+ }
+ let mut graph = job_info.execution_graph.write().await;
+ let session_id = graph.session_id().to_string();
+ let mut black_list = vec![];
+ while let Some((running_stage, task_id_gen)) =
+ graph.fetch_running_stage(&black_list)
+ {
+ if if_skip(running_stage.plan.clone()) {
+ info!(
+ "Will skip stage {}/{} for round robin task binding",
+ job_id, running_stage.stage_id
+ );
+ black_list.push(running_stage.stage_id);
+ continue;
+ }
+ // We are sure that it will at least bind one task by going through the following logic.
+ // It will not go into a dead loop.
+ let runnable_tasks = running_stage
+ .task_infos
+ .iter_mut()
+ .enumerate()
+ .filter(|(_partition, info)| info.is_none())
+ .take(total_slots as usize)
+ .collect::<Vec<_>>();
+ for (partition_id, task_info) in runnable_tasks {
+ // Move to the index which has available slots
+ if idx_slot >= slots.len() {
+ idx_slot = 0;
+ }
+ if slots[idx_slot].slots == 0 {
+ idx_slot = 0;
+ }
+ // Since the slots is a vector with descending order, and the total available slots is larger than 0,
+ // we are sure the available slot number at idx_slot is larger than 1
+ let mut slot = &mut slots[idx_slot];
+ let executor_id = slot.executor_id.clone();
+ let task_id = *task_id_gen;
+ *task_id_gen += 1;
+ *task_info = Some(create_task_info(executor_id.clone(), task_id));
+
+ let partition = PartitionId {
+ job_id: job_id.clone(),
+ stage_id: running_stage.stage_id,
+ partition_id,
+ };
+ let task_desc = TaskDescription {
+ session_id: session_id.clone(),
+ partition,
+ stage_attempt_num: running_stage.stage_attempt_num,
+ task_id,
+ task_attempt: running_stage.task_failure_numbers[partition_id],
+ plan: running_stage.plan.clone(),
+ };
+ schedulable_tasks.push((executor_id, task_desc));
+
+ idx_slot += 1;
+ slot.slots -= 1;
+ total_slots -= 1;
+ if total_slots == 0 {
+ return schedulable_tasks;
+ }
+ }
+ }
+ }
- let mut last_updated_idx = 0usize;
+ schedulable_tasks
+}
- loop {
- let n_before = n;
- for (idx, data) in slots.iter_mut().enumerate() {
- if n == 0 {
- break;
- }
+#[cfg(test)]
+mod test {
+ use crate::cluster::{bind_task_bias, bind_task_round_robin, BoundTask};
+ use crate::state::execution_graph::ExecutionGraph;
+ use crate::state::task_manager::JobInfoCache;
+ use crate::test_utils::{mock_completed_task, test_aggregation_plan_with_job_id};
+ use ballista_core::error::Result;
+ use ballista_core::serde::protobuf::AvailableTaskSlots;
+ use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification};
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
+ #[tokio::test]
+ async fn test_bind_task_bias() -> Result<()> {
+ let active_jobs = mock_active_jobs().await?;
+ let mut available_slots = mock_available_slots();
+ let available_slots_ref: Vec<&mut AvailableTaskSlots> =
+ available_slots.iter_mut().collect();
+
+ let bound_tasks =
+ bind_task_bias(available_slots_ref, Arc::new(active_jobs), |_| false).await;
+ assert_eq!(9, bound_tasks.len());
+
+ let result = get_result(bound_tasks);
+
+ let mut expected = Vec::new();
+ {
+ let mut expected0 = HashMap::new();
+
+ let mut entry_a = HashMap::new();
+ entry_a.insert("executor_3".to_string(), 2);
+ let mut entry_b = HashMap::new();
+ entry_b.insert("executor_3".to_string(), 5);
+ entry_b.insert("executor_2".to_string(), 2);
+
+ expected0.insert("job_a".to_string(), entry_a);
+ expected0.insert("job_b".to_string(), entry_b);
+
+ expected.push(expected0);
+ }
+ {
+ let mut expected0 = HashMap::new();
- // Since the vector is sorted in descending order,
- // if finding one executor has not enough slots, the following will have not enough, either
- if data.slots == 0 {
- break;
- }
+ let mut entry_b = HashMap::new();
+ entry_b.insert("executor_3".to_string(), 7);
+ let mut entry_a = HashMap::new();
+ entry_a.insert("executor_2".to_string(), 2);
- reservations.push(ExecutorReservation::new_free(data.executor_id.clone()));
- data.slots -= 1;
- n -= 1;
+ expected0.insert("job_a".to_string(), entry_a);
+ expected0.insert("job_b".to_string(), entry_b);
- if idx >= last_updated_idx {
- last_updated_idx = idx + 1;
- }
+ expected.push(expected0);
}
- if n_before == n {
- break;
+ assert!(
+ expected.contains(&result),
+ "The result {:?} is not as expected {:?}",
+ result,
+ expected
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_bind_task_round_robin() -> Result<()> {
+ let active_jobs = mock_active_jobs().await?;
+ let mut available_slots = mock_available_slots();
+ let available_slots_ref: Vec<&mut AvailableTaskSlots> =
+ available_slots.iter_mut().collect();
+
+ let bound_tasks =
+ bind_task_round_robin(available_slots_ref, Arc::new(active_jobs), |_| false)
+ .await;
+ assert_eq!(9, bound_tasks.len());
+
+ let result = get_result(bound_tasks);
+
+ let mut expected = Vec::new();
+ {
+ let mut expected0 = HashMap::new();
+
+ let mut entry_a = HashMap::new();
+ entry_a.insert("executor_3".to_string(), 1);
+ entry_a.insert("executor_2".to_string(), 1);
+ let mut entry_b = HashMap::new();
+ entry_b.insert("executor_1".to_string(), 3);
+ entry_b.insert("executor_3".to_string(), 2);
+ entry_b.insert("executor_2".to_string(), 2);
+
+ expected0.insert("job_a".to_string(), entry_a);
+ expected0.insert("job_b".to_string(), entry_b);
+
+ expected.push(expected0);
+ }
+ {
+ let mut expected0 = HashMap::new();
+
+ let mut entry_b = HashMap::new();
+ entry_b.insert("executor_3".to_string(), 3);
+ entry_b.insert("executor_2".to_string(), 2);
+ entry_b.insert("executor_1".to_string(), 2);
+ let mut entry_a = HashMap::new();
+ entry_a.insert("executor_2".to_string(), 1);
+ entry_a.insert("executor_1".to_string(), 1);
+
+ expected0.insert("job_a".to_string(), entry_a);
+ expected0.insert("job_b".to_string(), entry_b);
+
+ expected.push(expected0);
}
+
+ assert!(
+ expected.contains(&result),
+ "The result {:?} is not as expected {:?}",
+ result,
+ expected
+ );
+
+ Ok(())
}
- reservations
+ fn get_result(
+ bound_tasks: Vec<BoundTask>,
+ ) -> HashMap<String, HashMap<String, usize>> {
+ let mut result = HashMap::new();
+
+ for bound_task in bound_tasks {
+ let entry = result
+ .entry(bound_task.1.partition.job_id)
+ .or_insert_with(HashMap::new);
+ let n = entry.entry(bound_task.0).or_insert_with(|| 0);
+ *n += 1;
+ }
+
+ result
+ }
+
+ async fn mock_active_jobs() -> Result<HashMap<String, JobInfoCache>> {
+ let num_partition = 8usize;
+
+ let graph_a = mock_graph("job_a", num_partition, 2).await?;
+
+ let graph_b = mock_graph("job_b", num_partition, 7).await?;
+
+ let mut active_jobs = HashMap::new();
+ active_jobs.insert(graph_a.job_id().to_string(), JobInfoCache::new(graph_a));
+ active_jobs.insert(graph_b.job_id().to_string(), JobInfoCache::new(graph_b));
+
+ Ok(active_jobs)
+ }
+
+ async fn mock_graph(
+ job_id: &str,
+ num_partition: usize,
+ num_pending_task: usize,
+ ) -> Result<ExecutionGraph> {
+ let mut graph = test_aggregation_plan_with_job_id(num_partition, job_id).await;
+ let executor = ExecutorMetadata {
+ id: "executor_0".to_string(),
+ host: "localhost".to_string(),
+ port: 50051,
+ grpc_port: 50052,
+ specification: ExecutorSpecification { task_slots: 32 },
+ };
+
+ if let Some(task) = graph.pop_next_task(&executor.id)? {
+ let task_status = mock_completed_task(task, &executor.id);
+ graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+ }
+
+ graph.revive();
+
+ for _i in 0..num_partition - num_pending_task {
+ if let Some(task) = graph.pop_next_task(&executor.id)? {
+ let task_status = mock_completed_task(task, &executor.id);
+ graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+ }
+ }
+
+ Ok(graph)
+ }
+
+ fn mock_available_slots() -> Vec<AvailableTaskSlots> {
+ vec![
+ AvailableTaskSlots {
+ executor_id: "executor_1".to_string(),
+ slots: 3,
+ },
+ AvailableTaskSlots {
+ executor_id: "executor_2".to_string(),
+ slots: 5,
+ },
+ AvailableTaskSlots {
+ executor_id: "executor_3".to_string(),
+ slots: 7,
+ },
+ ]
+ }
}
diff --git a/ballista/scheduler/src/cluster/storage/mod.rs b/ballista/scheduler/src/cluster/storage/mod.rs
index f4080468..6aa049c0 100644
--- a/ballista/scheduler/src/cluster/storage/mod.rs
+++ b/ballista/scheduler/src/cluster/storage/mod.rs
@@ -114,17 +114,6 @@ pub trait KeyValueStore: Send + Sync + Clone + 'static {
async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
}
-/// Method of distributing tasks to available executor slots
-#[derive(Debug, Clone, Copy)]
-pub enum TaskDistribution {
- /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
- /// as are currently available
- Bias,
- /// Distributed tasks evenely across executors. This will try and iterate through available executors
- /// and assign one task to each executor until all tasks are assigned.
- RoundRobin,
-}
-
/// A Watch is a cancelable stream of put or delete events in the [StateBackendClient]
#[async_trait]
pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
diff --git a/ballista/scheduler/src/cluster/test/mod.rs b/ballista/scheduler/src/cluster/test/mod.rs
deleted file mode 100644
index b9056bfd..00000000
--- a/ballista/scheduler/src/cluster/test/mod.rs
+++ /dev/null
@@ -1,587 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cluster::{ClusterState, JobState, JobStateEvent, TaskDistribution};
-use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::executor_manager::ExecutorReservation;
-use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{executor_status, JobStatus};
-use ballista_core::serde::scheduler::{
- ExecutorData, ExecutorMetadata, ExecutorSpecification,
-};
-use futures::StreamExt;
-use itertools::Itertools;
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::RwLock;
-
-pub struct ClusterStateTest<S: ClusterState> {
- state: Arc<S>,
- reservations: Vec<ExecutorReservation>,
- total_task_slots: u32,
-}
-
-impl<S: ClusterState> ClusterStateTest<S> {
- pub async fn new(state: S) -> Result<Self> {
- Ok(Self {
- state: Arc::new(state),
- reservations: vec![],
- total_task_slots: 0,
- })
- }
-
- pub async fn register_executor(
- mut self,
- executor_id: &str,
- task_slots: u32,
- ) -> Result<Self> {
- self.state
- .register_executor(
- ExecutorMetadata {
- id: executor_id.to_string(),
- host: executor_id.to_string(),
- port: 0,
- grpc_port: 0,
- specification: ExecutorSpecification { task_slots },
- },
- ExecutorData {
- executor_id: executor_id.to_string(),
- total_task_slots: task_slots,
- available_task_slots: task_slots,
- },
- false,
- )
- .await?;
-
- self.total_task_slots += task_slots;
-
- Ok(self)
- }
-
- pub async fn remove_executor(self, executor_id: &str) -> Result<Self> {
- self.state.remove_executor(executor_id).await?;
-
- Ok(self)
- }
-
- pub async fn assert_live_executor(
- self,
- executor_id: &str,
- task_slots: u32,
- ) -> Result<Self> {
- let executor = self.state.get_executor_metadata(executor_id).await;
- assert!(
- executor.is_ok(),
- "Metadata for executor {} not found in state",
- executor_id
- );
- assert_eq!(
- executor.unwrap().specification.task_slots,
- task_slots,
- "Unexpected number of task slots for executor"
- );
-
- // Heratbeat stream is async so wait up to 500ms for it to show up
- await_condition(Duration::from_millis(50), 10, || {
- let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or(
- false,
- |heartbeat| {
- matches!(
- heartbeat.status,
- Some(ballista_core::serde::generated::ballista::ExecutorStatus {
- status: Some(executor_status::Status::Active(_))
- })
- )
- },
- );
-
- futures::future::ready(Ok(found_heartbeat))
- })
- .await?;
-
- Ok(self)
- }
-
- pub async fn assert_dead_executor(self, executor_id: &str) -> Result<Self> {
- // Heratbeat stream is async so wait up to 500ms for it to show up
- await_condition(Duration::from_millis(50), 10, || {
- let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or(
- true,
- |heartbeat| {
- matches!(
- heartbeat.status,
- Some(ballista_core::serde::generated::ballista::ExecutorStatus {
- status: Some(executor_status::Status::Dead(_))
- })
- )
- },
- );
-
- futures::future::ready(Ok(found_heartbeat))
- })
- .await?;
-
- Ok(self)
- }
-
- pub async fn try_reserve_slots(
- mut self,
- num_slots: u32,
- distribution: TaskDistribution,
- filter: Option<Vec<String>>,
- exact: bool,
- ) -> Result<Self> {
- let filter = filter.map(|f| f.into_iter().collect::<HashSet<String>>());
- let reservations = if exact {
- self.state
- .reserve_slots_exact(num_slots, distribution, filter)
- .await?
- } else {
- self.state
- .reserve_slots(num_slots, distribution, filter)
- .await?
- };
-
- self.reservations.extend(reservations);
-
- Ok(self)
- }
-
- pub async fn cancel_reservations(mut self, num_slots: usize) -> Result<Self> {
- if self.reservations.len() < num_slots {
- return Err(BallistaError::General(format!(
- "Not enough reservations to cancel, expected {} but found {}",
- num_slots,
- self.reservations.len()
- )));
- }
-
- let to_keep = self.reservations.split_off(num_slots);
-
- self.state
- .cancel_reservations(std::mem::take(&mut self.reservations))
- .await?;
-
- self.reservations = to_keep;
-
- Ok(self)
- }
-
- pub fn assert_open_reservations(self, n: usize) -> Self {
- assert_eq!(
- self.reservations.len(),
- n,
- "Expectedt {} open reservations but found {}",
- n,
- self.reservations.len()
- );
- self
- }
-
- pub fn assert_open_reservations_with<F: Fn(&ExecutorReservation) -> bool>(
- self,
- n: usize,
- predicate: F,
- ) -> Self {
- assert_eq!(
- self.reservations.len(),
- n,
- "Expected {} open reservations but found {}",
- n,
- self.reservations.len()
- );
-
- for res in &self.reservations {
- assert!(predicate(res), "Predicate failed on reservation {:?}", res);
- }
- self
- }
-
- pub async fn fuzz_reservation(
- mut self,
- concurrency: usize,
- distribution: TaskDistribution,
- ) -> Result<()> {
- let (sender, mut receiver) = tokio::sync::mpsc::channel(1_000);
-
- let total_slots = self.total_task_slots;
- for _ in 0..concurrency {
- let state = self.state.clone();
- let sender_clone = sender.clone();
- tokio::spawn(async move {
- let mut open_reservations = vec![];
- for i in 0..10 {
- if i % 2 == 0 {
- let to_reserve = rand::random::<u32>() % total_slots;
-
- let reservations = state
- .reserve_slots(to_reserve, distribution, None)
- .await
- .unwrap();
-
- sender_clone
- .send(FuzzEvent::Reserved(reservations.clone()))
- .await
- .unwrap();
-
- open_reservations = reservations;
- } else {
- state
- .cancel_reservations(open_reservations.clone())
- .await
- .unwrap();
- sender_clone
- .send(FuzzEvent::Cancelled(std::mem::take(
- &mut open_reservations,
- )))
- .await
- .unwrap();
- }
- }
- });
- }
-
- drop(sender);
-
- while let Some(event) = receiver.recv().await {
- match event {
- FuzzEvent::Reserved(reservations) => {
- self.reservations.extend(reservations);
- assert!(
- self.reservations.len() <= total_slots as usize,
- "More than total number of slots was reserved"
- );
- }
- FuzzEvent::Cancelled(reservations) => {
- for res in reservations {
- let idx = self
- .reservations
- .iter()
- .find_position(|r| r.executor_id == res.executor_id);
- assert!(idx.is_some(), "Received invalid cancellation, not existing reservation for executor ID {}", res.executor_id);
-
- self.reservations.swap_remove(idx.unwrap().0);
- }
- }
- }
- }
-
- Ok(())
- }
-}
-
-#[derive(Debug, Clone)]
-enum FuzzEvent {
- Reserved(Vec<ExecutorReservation>),
- Cancelled(Vec<ExecutorReservation>),
-}
-
-pub async fn test_fuzz_reservations<S: ClusterState>(
- state: S,
- concurrency: usize,
- distribution: TaskDistribution,
- num_executors: usize,
- task_slots_per_executor: usize,
-) -> Result<()> {
- let mut test = ClusterStateTest::new(state).await?;
-
- for idx in 0..num_executors {
- test = test
- .register_executor(idx.to_string().as_str(), task_slots_per_executor as u32)
- .await?;
- }
-
- test.fuzz_reservation(concurrency, distribution).await
-}
-
-pub async fn test_executor_registration<S: ClusterState>(state: S) -> Result<()> {
- let test = ClusterStateTest::new(state).await?;
-
- test.register_executor("1", 10)
- .await?
- .register_executor("2", 10)
- .await?
- .register_executor("3", 10)
- .await?
- .assert_live_executor("1", 10)
- .await?
- .assert_live_executor("2", 10)
- .await?
- .assert_live_executor("3", 10)
- .await?
- .remove_executor("1")
- .await?
- .assert_dead_executor("1")
- .await?
- .remove_executor("2")
- .await?
- .assert_dead_executor("2")
- .await?
- .remove_executor("3")
- .await?
- .assert_dead_executor("3")
- .await?;
-
- Ok(())
-}
-
-pub async fn test_reservation<S: ClusterState>(
- state: S,
- distribution: TaskDistribution,
-) -> Result<()> {
- let test = ClusterStateTest::new(state).await?;
-
- test.register_executor("1", 10)
- .await?
- .register_executor("2", 10)
- .await?
- .register_executor("3", 10)
- .await?
- .try_reserve_slots(10, distribution, None, false)
- .await?
- .assert_open_reservations(10)
- .cancel_reservations(10)
- .await?
- .try_reserve_slots(30, distribution, None, false)
- .await?
- .assert_open_reservations(30)
- .cancel_reservations(15)
- .await?
- .assert_open_reservations(15)
- .try_reserve_slots(30, distribution, None, false)
- .await?
- .assert_open_reservations(30)
- .cancel_reservations(30)
- .await?
- .assert_open_reservations(0)
- .try_reserve_slots(50, distribution, None, false)
- .await?
- .assert_open_reservations(30)
- .cancel_reservations(30)
- .await?
- .try_reserve_slots(20, distribution, Some(vec!["1".to_string()]), false)
- .await?
- .assert_open_reservations_with(10, |res| res.executor_id == "1")
- .cancel_reservations(10)
- .await?
- .try_reserve_slots(
- 20,
- distribution,
- Some(vec!["2".to_string(), "3".to_string()]),
- false,
- )
- .await?
- .assert_open_reservations_with(20, |res| {
- res.executor_id == "2" || res.executor_id == "3"
- });
-
- Ok(())
-}
-
-pub struct JobStateTest<S: JobState> {
- state: Arc<S>,
- events: Arc<RwLock<Vec<JobStateEvent>>>,
-}
-
-impl<S: JobState> JobStateTest<S> {
- pub async fn new(state: S) -> Result<Self> {
- let events = Arc::new(RwLock::new(vec![]));
-
- let mut event_stream = state.job_state_events().await?;
- let events_clone = events.clone();
- tokio::spawn(async move {
- while let Some(event) = event_stream.next().await {
- let mut guard = events_clone.write().await;
-
- guard.push(event);
- }
- });
-
- Ok(Self {
- state: Arc::new(state),
- events,
- })
- }
-
- pub async fn queue_job(self, job_id: &str) -> Result<Self> {
- self.state
- .accept_job(job_id, "", timestamp_millis())
- .await?;
- Ok(self)
- }
-
- pub async fn fail_planning(self, job_id: &str) -> Result<Self> {
- self.state
- .fail_unscheduled_job(job_id, "failed planning".to_string())
- .await?;
- Ok(self)
- }
-
- pub async fn assert_queued(self, job_id: &str) -> Result<Self> {
- let status = self.state.get_job_status(job_id).await?;
-
- assert!(status.is_some(), "Queued job {} not found", job_id);
-
- let status = status.unwrap();
- assert!(
- matches!(&status, JobStatus {
- job_id: status_job_id, status: Some(Status::Queued(_)), ..
- } if status_job_id.as_str() == job_id),
- "Expected queued status but found {:?}",
- status
- );
-
- Ok(self)
- }
-
- pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
- self.state
- .submit_job(graph.job_id().to_string(), graph)
- .await?;
- Ok(self)
- }
-
- pub async fn assert_job_running(self, job_id: &str) -> Result<Self> {
- let status = self.state.get_job_status(job_id).await?;
-
- assert!(status.is_some(), "Job status not found for {}", job_id);
-
- let status = status.unwrap();
- assert!(
- matches!(&status, JobStatus {
- job_id: status_job_id, status: Some(Status::Running(_)), ..
- } if status_job_id.as_str() == job_id),
- "Expected running status but found {:?}",
- status
- );
-
- Ok(self)
- }
-
- pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
- self.state.save_job(graph.job_id(), graph).await?;
- Ok(self)
- }
-
- pub async fn assert_job_failed(self, job_id: &str) -> Result<Self> {
- let status = self.state.get_job_status(job_id).await?;
-
- assert!(status.is_some(), "Job status not found for {}", job_id);
-
- let status = status.unwrap();
- assert!(
- matches!(&status, JobStatus {
- job_id: status_job_id, status: Some(Status::Failed(_)), ..
- } if status_job_id.as_str() == job_id),
- "Expected failed status but found {:?}",
- status
- );
-
- Ok(self)
- }
-
- pub async fn assert_job_successful(self, job_id: &str) -> Result<Self> {
- let status = self.state.get_job_status(job_id).await?;
-
- assert!(status.is_some(), "Job status not found for {}", job_id);
- let status = status.unwrap();
- assert!(
- matches!(&status, JobStatus {
- job_id: status_job_id, status: Some(Status::Successful(_)), ..
- } if status_job_id.as_str() == job_id),
- "Expected success status but found {:?}",
- status
- );
-
- Ok(self)
- }
-
- pub async fn assert_event(self, event: JobStateEvent) -> Result<Self> {
- let events = self.events.clone();
- let found = await_condition(Duration::from_millis(50), 10, || async {
- let guard = events.read().await;
-
- Ok(guard.iter().any(|ev| ev == &event))
- })
- .await?;
-
- assert!(found, "Expected event {:?}", event);
-
- Ok(self)
- }
-}
-
-pub async fn test_job_lifecycle<S: JobState>(
- state: S,
- mut graph: ExecutionGraph,
-) -> Result<()> {
- let test = JobStateTest::new(state).await?;
-
- let job_id = graph.job_id().to_string();
-
- let test = test
- .queue_job(&job_id)
- .await?
- .assert_queued(&job_id)
- .await?
- .submit_job(&graph)
- .await?
- .assert_job_running(&job_id)
- .await?;
-
- drain_tasks(&mut graph)?;
- graph.succeed_job()?;
-
- test.update_job(&graph)
- .await?
- .assert_job_successful(&job_id)
- .await?;
-
- Ok(())
-}
-
-pub async fn test_job_planning_failure<S: JobState>(
- state: S,
- graph: ExecutionGraph,
-) -> Result<()> {
- let test = JobStateTest::new(state).await?;
-
- let job_id = graph.job_id().to_string();
-
- test.queue_job(&job_id)
- .await?
- .fail_planning(&job_id)
- .await?
- .assert_job_failed(&job_id)
- .await?;
-
- Ok(())
-}
-
-fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
- let executor = mock_executor("executor-id1".to_string());
- while let Some(task) = graph.pop_next_task(&executor.id)? {
- let task_status = mock_completed_task(task, &executor.id);
- graph.update_task_status(&executor, vec![task_status], 1, 1)?;
- }
-
- Ok(())
-}
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs b/ballista/scheduler/src/cluster/test_util/mod.rs
new file mode 100644
index 00000000..64da6c60
--- /dev/null
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cluster::{JobState, JobStateEvent};
+use crate::scheduler_server::timestamp_millis;
+use crate::state::execution_graph::ExecutionGraph;
+use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::job_status::Status;
+use ballista_core::serde::protobuf::JobStatus;
+use futures::StreamExt;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::RwLock;
+
+pub struct JobStateTest<S: JobState> {
+ state: Arc<S>,
+ events: Arc<RwLock<Vec<JobStateEvent>>>,
+}
+
+impl<S: JobState> JobStateTest<S> {
+ pub async fn new(state: S) -> Result<Self> {
+ let events = Arc::new(RwLock::new(vec![]));
+
+ let mut event_stream = state.job_state_events().await?;
+ let events_clone = events.clone();
+ tokio::spawn(async move {
+ while let Some(event) = event_stream.next().await {
+ let mut guard = events_clone.write().await;
+
+ guard.push(event);
+ }
+ });
+
+ Ok(Self {
+ state: Arc::new(state),
+ events,
+ })
+ }
+
+ pub fn queue_job(self, job_id: &str) -> Result<Self> {
+ self.state.accept_job(job_id, "", timestamp_millis())?;
+ Ok(self)
+ }
+
+ pub async fn fail_planning(self, job_id: &str) -> Result<Self> {
+ self.state
+ .fail_unscheduled_job(job_id, "failed planning".to_string())
+ .await?;
+ Ok(self)
+ }
+
+ pub async fn assert_queued(self, job_id: &str) -> Result<Self> {
+ let status = self.state.get_job_status(job_id).await?;
+
+ assert!(status.is_some(), "Queued job {} not found", job_id);
+
+ let status = status.unwrap();
+ assert!(
+ matches!(&status, JobStatus {
+ job_id: status_job_id, status: Some(Status::Queued(_)), ..
+ } if status_job_id.as_str() == job_id),
+ "Expected queued status but found {:?}",
+ status
+ );
+
+ Ok(self)
+ }
+
+ pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
+ self.state
+ .submit_job(graph.job_id().to_string(), graph)
+ .await?;
+ Ok(self)
+ }
+
+ pub async fn assert_job_running(self, job_id: &str) -> Result<Self> {
+ let status = self.state.get_job_status(job_id).await?;
+
+ assert!(status.is_some(), "Job status not found for {}", job_id);
+
+ let status = status.unwrap();
+ assert!(
+ matches!(&status, JobStatus {
+ job_id: status_job_id, status: Some(Status::Running(_)), ..
+ } if status_job_id.as_str() == job_id),
+ "Expected running status but found {:?}",
+ status
+ );
+
+ Ok(self)
+ }
+
+ pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
+ self.state.save_job(graph.job_id(), graph).await?;
+ Ok(self)
+ }
+
+ pub async fn assert_job_failed(self, job_id: &str) -> Result<Self> {
+ let status = self.state.get_job_status(job_id).await?;
+
+ assert!(status.is_some(), "Job status not found for {}", job_id);
+
+ let status = status.unwrap();
+ assert!(
+ matches!(&status, JobStatus {
+ job_id: status_job_id, status: Some(Status::Failed(_)), ..
+ } if status_job_id.as_str() == job_id),
+ "Expected failed status but found {:?}",
+ status
+ );
+
+ Ok(self)
+ }
+
+ pub async fn assert_job_successful(self, job_id: &str) -> Result<Self> {
+ let status = self.state.get_job_status(job_id).await?;
+
+ assert!(status.is_some(), "Job status not found for {}", job_id);
+ let status = status.unwrap();
+ assert!(
+ matches!(&status, JobStatus {
+ job_id: status_job_id, status: Some(Status::Successful(_)), ..
+ } if status_job_id.as_str() == job_id),
+ "Expected success status but found {:?}",
+ status
+ );
+
+ Ok(self)
+ }
+
+ pub async fn assert_event(self, event: JobStateEvent) -> Result<Self> {
+ let events = self.events.clone();
+ let found = await_condition(Duration::from_millis(50), 10, || async {
+ let guard = events.read().await;
+
+ Ok(guard.iter().any(|ev| ev == &event))
+ })
+ .await?;
+
+ assert!(found, "Expected event {:?}", event);
+
+ Ok(self)
+ }
+}
+
+pub async fn test_job_lifecycle<S: JobState>(
+ state: S,
+ mut graph: ExecutionGraph,
+) -> Result<()> {
+ let test = JobStateTest::new(state).await?;
+
+ let job_id = graph.job_id().to_string();
+
+ let test = test
+ .queue_job(&job_id)?
+ .assert_queued(&job_id)
+ .await?
+ .submit_job(&graph)
+ .await?
+ .assert_job_running(&job_id)
+ .await?;
+
+ drain_tasks(&mut graph)?;
+ graph.succeed_job()?;
+
+ test.update_job(&graph)
+ .await?
+ .assert_job_successful(&job_id)
+ .await?;
+
+ Ok(())
+}
+
+pub async fn test_job_planning_failure<S: JobState>(
+ state: S,
+ graph: ExecutionGraph,
+) -> Result<()> {
+ let test = JobStateTest::new(state).await?;
+
+ let job_id = graph.job_id().to_string();
+
+ test.queue_job(&job_id)?
+ .fail_planning(&job_id)
+ .await?
+ .assert_job_failed(&job_id)
+ .await?;
+
+ Ok(())
+}
+
+fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+ let executor = mock_executor("executor-id1".to_string());
+ while let Some(task) = graph.pop_next_task(&executor.id)? {
+ let task_status = mock_completed_task(task, &executor.id);
+ graph.update_task_status(&executor, vec![task_status], 1, 1)?;
+ }
+
+ Ok(())
+}
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index ed828ec0..99b938a1 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -15,14 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use crate::state::executor_manager::ExecutorReservation;
use std::fmt::{Debug, Formatter};
use datafusion::logical_expr::LogicalPlan;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::TaskStatus;
-use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
@@ -37,12 +35,8 @@ pub enum QueryStageSchedulerEvent {
},
JobSubmitted {
job_id: String,
- job_name: String,
- session_id: String,
queued_at: u64,
submitted_at: u64,
- resubmit: bool,
- plan: Arc<dyn ExecutionPlan>,
},
// For a job which failed during planning
JobPlanningFailed {
@@ -67,7 +61,7 @@ pub enum QueryStageSchedulerEvent {
JobCancel(String),
JobDataClean(String),
TaskUpdating(String, Vec<TaskStatus>),
- ReservationOffering(Vec<ExecutorReservation>),
+ ReviveOffers,
ExecutorLost(String, Option<String>),
CancelTasks(Vec<RunningTaskInfo>),
}
@@ -80,10 +74,8 @@ impl Debug for QueryStageSchedulerEvent {
} => {
write!(f, "JobQueued : job_id={job_id}, job_name={job_name}.")
}
- QueryStageSchedulerEvent::JobSubmitted {
- job_id, resubmit, ..
- } => {
- write!(f, "JobSubmitted : job_id={job_id}, resubmit={resubmit}.")
+ QueryStageSchedulerEvent::JobSubmitted { job_id, .. } => {
+ write!(f, "JobSubmitted : job_id={job_id}.")
}
QueryStageSchedulerEvent::JobPlanningFailed {
job_id,
@@ -129,8 +121,8 @@ impl Debug for QueryStageSchedulerEvent {
QueryStageSchedulerEvent::TaskUpdating(job_id, status) => {
write!(f, "TaskUpdating : job_id={job_id}, status:[{status:?}].")
}
- QueryStageSchedulerEvent::ReservationOffering(reservations) => {
- write!(f, "ReservationOffering : reservations:[{reservations:?}].")
+ QueryStageSchedulerEvent::ReviveOffers => {
+ write!(f, "ReviveOffers.")
}
QueryStageSchedulerEvent::ExecutorLost(job_id, reason) => {
write!(f, "ExecutorLost : job_id={job_id}, reason:[{reason:?}].")
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index 104037e2..f2feb243 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -25,7 +25,8 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
use tonic::{Request, Response};
-const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
+const PENDING_JOBS_METRIC_NAME: &str = "pending_jobs";
+const RUNNING_JOBS_METRIC_NAME: &str = "running_jobs";
#[tonic::async_trait]
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
@@ -44,8 +45,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
Ok(Response::new(GetMetricSpecResponse {
metric_specs: vec![MetricSpec {
- metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
- target_size: 1,
+ metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
+ target_size: 0,
}],
}))
}
@@ -55,10 +56,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
_request: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, tonic::Status> {
Ok(Response::new(GetMetricsResponse {
- metric_values: vec![MetricValue {
- metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
- metric_value: self.pending_tasks() as i64,
- }],
+ metric_values: vec![
+ MetricValue {
+ metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
+ metric_value: self.pending_job_number() as i64,
+ },
+ MetricValue {
+ metric_name: RUNNING_JOBS_METRIC_NAME.to_string(),
+ metric_value: self.running_job_number() as i64,
+ },
+ ],
}))
}
}
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 485afa8c..f1230c71 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -23,15 +23,16 @@ use std::convert::TryInto;
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
- execute_query_failure_result, execute_query_result, CancelJobParams, CancelJobResult,
- CleanJobDataParams, CleanJobDataResult, CreateSessionParams, CreateSessionResult,
- ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult,
- ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams,
- ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
- GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
- PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
- RemoveSessionParams, RemoveSessionResult, UpdateSessionParams, UpdateSessionResult,
- UpdateTaskStatusParams, UpdateTaskStatusResult,
+ execute_query_failure_result, execute_query_result, AvailableTaskSlots,
+ CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
+ CreateSessionParams, CreateSessionResult, ExecuteQueryFailureResult,
+ ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat,
+ ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
+ GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+ HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+ RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult,
+ UpdateSessionParams, UpdateSessionResult, UpdateTaskStatusParams,
+ UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
@@ -46,13 +47,14 @@ use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use std::ops::Deref;
use std::sync::Arc;
+use crate::cluster::{bind_task_bias, bind_task_round_robin};
+use crate::config::TaskDistribution;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use datafusion::prelude::SessionContext;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
use crate::scheduler_server::SchedulerServer;
-use crate::state::executor_manager::ExecutorReservation;
#[tonic::async_trait]
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
@@ -76,63 +78,69 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
} = request.into_inner()
{
trace!("Received poll_work request for {:?}", metadata);
- let metadata = ExecutorMetadata {
- id: metadata.id,
- host: metadata
- .optional_host
- .map(|h| match h {
- OptionalHost::Host(host) => host,
- })
- .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
- port: metadata.port as u16,
- grpc_port: metadata.grpc_port as u16,
- specification: metadata.specification.unwrap().into(),
- };
+ let executor_id = metadata.id.clone();
- self.state
- .executor_manager
- .save_executor_metadata(metadata.clone())
- .await
- .map_err(|e| {
- let msg = format!("Could not save executor metadata: {e}");
- error!("{}", msg);
- Status::internal(msg)
- })?;
+ // It's not necessary.
+ // It's only for the scheduler to have a picture of the whole executor cluster.
+ {
+ let metadata = ExecutorMetadata {
+ id: metadata.id,
+ host: metadata
+ .optional_host
+ .map(|h| match h {
+ OptionalHost::Host(host) => host,
+ })
+ .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
+ port: metadata.port as u16,
+ grpc_port: metadata.grpc_port as u16,
+ specification: metadata.specification.unwrap().into(),
+ };
+ if let Err(e) = self
+ .state
+ .executor_manager
+ .save_executor_metadata(metadata)
+ .await
+ {
+ warn!("Could not save executor metadata: {:?}", e);
+ }
+ }
- self.update_task_status(&metadata.id, task_status)
+ self.update_task_status(&executor_id, task_status)
.await
.map_err(|e| {
let msg = format!(
"Fail to update tasks status from executor {:?} due to {:?}",
- &metadata.id, e
+ &executor_id, e
);
error!("{}", msg);
Status::internal(msg)
})?;
- // Find `num_free_slots` next tasks when available
- let mut next_tasks = vec![];
- let reservations = vec![
- ExecutorReservation::new_free(metadata.id.clone());
- num_free_slots as usize
- ];
- if let Ok((mut assignments, _, _)) = self
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- while let Some((_, task)) = assignments.pop() {
- match self.state.task_manager.prepare_task_definition(task) {
- Ok(task_definition) => next_tasks.push(task_definition),
- Err(e) => {
- error!("Error preparing task definition: {:?}", e);
- }
+ let mut available_slots = vec![AvailableTaskSlots {
+ executor_id,
+ slots: num_free_slots,
+ }];
+ let available_slots = available_slots.iter_mut().collect();
+ let active_jobs = self.state.task_manager.get_running_job_cache();
+ let schedulable_tasks = match self.state.config.task_distribution {
+ TaskDistribution::Bias => {
+ bind_task_bias(available_slots, active_jobs, |_| false).await
+ }
+ TaskDistribution::RoundRobin => {
+ bind_task_round_robin(available_slots, active_jobs, |_| false).await
+ }
+ };
+
+ let mut tasks = vec![];
+ for (_, task) in schedulable_tasks {
+ match self.state.task_manager.prepare_task_definition(task) {
+ Ok(task_definition) => tasks.push(task_definition),
+ Err(e) => {
+ error!("Error preparing task definition: {:?}", e);
}
}
}
-
- Ok(Response::new(PollWorkResult { tasks: next_tasks }))
+ Ok(Response::new(PollWorkResult { tasks }))
} else {
warn!("Received invalid executor poll_work request");
Err(Status::invalid_argument("Missing metadata in request"))
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 74bf3359..25fa4296 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -20,7 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use ballista_core::error::Result;
use ballista_core::event_loop::{EventLoop, EventSender};
-use ballista_core::serde::protobuf::{StopExecutorParams, TaskStatus};
+use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::SessionState;
@@ -38,7 +38,7 @@ use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
use crate::state::task_manager::TaskLauncher;
use crate::state::SchedulerState;
@@ -146,13 +146,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
Ok(())
}
- #[cfg(test)]
- pub(crate) fn query_stage_scheduler(&self) -> Arc<QueryStageScheduler<T, U>> {
- self.query_stage_scheduler.clone()
+ pub fn pending_job_number(&self) -> usize {
+ self.state.task_manager.pending_job_number()
}
- pub(crate) fn pending_tasks(&self) -> usize {
- self.query_stage_scheduler.pending_tasks()
+ pub fn running_job_number(&self) -> usize {
+ self.state.task_manager.running_job_number()
}
pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
@@ -204,13 +203,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.await
}
- pub(crate) async fn offer_reservation(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
+ pub(crate) async fn revive_offers(&self) -> Result<()> {
self.query_stage_event_loop
.get_sender()?
- .post_event(QueryStageSchedulerEvent::ReservationOffering(reservations))
+ .post_event(QueryStageSchedulerEvent::ReviveOffers)
.await
}
@@ -224,7 +220,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
let expired_executors = state.executor_manager.get_expired_executors();
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
- let executor_manager = state.executor_manager.clone();
let sender_clone = event_sender.clone();
@@ -251,7 +246,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
// If executor is expired, remove it immediately
Self::remove_executor(
- executor_manager,
+ state.executor_manager.clone(),
sender_clone,
&executor_id,
Some(stop_reason.clone()),
@@ -261,31 +256,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
// If executor is not already terminating then stop it. If it is terminating then it should already be shutting
// down and we do not need to do anything here.
if !terminating {
- match state.executor_manager.get_client(&executor_id).await {
- Ok(mut client) => {
- tokio::task::spawn(async move {
- match client
- .stop_executor(StopExecutorParams {
- executor_id,
- reason: stop_reason,
- force: true,
- })
- .await
- {
- Err(error) => {
- warn!(
- "Failed to send stop_executor rpc due to, {}",
- error
- );
- }
- Ok(_value) => {}
- }
- });
- }
- Err(_) => {
- warn!("Executor is already dead, failed to connect to Executor {}", executor_id);
- }
- }
+ state
+ .executor_manager
+ .stop_executor(&executor_id, stop_reason)
+ .await;
}
}
tokio::time::sleep(Duration::from_secs(
@@ -334,16 +308,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
};
// Save the executor to state
- let reservations = self
- .state
+ self.state
.executor_manager
- .register_executor(metadata, executor_data, false)
+ .register_executor(metadata, executor_data)
.await?;
// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
- self.offer_reservation(reservations).await?;
+ self.revive_offers().await?;
}
Ok(())
@@ -412,7 +385,7 @@ mod test {
scheduler
.state
.executor_manager
- .register_executor(executor_metadata, executor_data, false)
+ .register_executor(executor_metadata, executor_data)
.await?;
}
@@ -430,23 +403,15 @@ mod test {
scheduler
.state
.task_manager
- .queue_job(job_id, "", timestamp_millis())
- .await?;
+ .queue_job(job_id, "", timestamp_millis())?;
- // Plan job
- let plan = scheduler
+ // Submit job
+ scheduler
.state
- .plan_job(job_id, ctx.clone(), &plan)
+ .submit_job(job_id, "", ctx, &plan, 0)
.await
.expect("submitting plan");
- //Submit job plan
- scheduler
- .state
- .task_manager
- .submit_job(job_id, "", &ctx.session_id(), plan, 0)
- .await?;
-
// Refresh the ExecutionGraph
while let Some(graph) = scheduler
.state
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 1cd611b8..138f7090 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -35,7 +34,6 @@ use tokio::time::Instant;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
pub(crate) struct QueryStageScheduler<
@@ -44,7 +42,6 @@ pub(crate) struct QueryStageScheduler<
> {
state: Arc<SchedulerState<T, U>>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
- pending_tasks: AtomicUsize,
config: Arc<SchedulerConfig>,
}
@@ -57,21 +54,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
Self {
state,
metrics_collector,
- pending_tasks: AtomicUsize::default(),
config,
}
}
- pub(crate) fn set_pending_tasks(&self, tasks: usize) {
- self.pending_tasks.store(tasks, Ordering::SeqCst);
- self.metrics_collector
- .set_pending_tasks_queue_size(tasks as u64);
- }
-
- pub(crate) fn pending_tasks(&self) -> usize {
- self.pending_tasks.load(Ordering::SeqCst)
- }
-
pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
self.metrics_collector.as_ref()
}
@@ -99,7 +85,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
if self.config.scheduler_event_expected_processing_duration > 0 {
time_recorder = Some((Instant::now(), event.clone()));
};
- let tx_event = EventSender::new(tx_event.clone());
+ let event_sender = EventSender::new(tx_event.clone());
match event {
QueryStageSchedulerEvent::JobQueued {
job_id,
@@ -110,127 +96,55 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
} => {
info!("Job {} queued with name {:?}", job_id, job_name);
- self.state
+ if let Err(e) = self
+ .state
.task_manager
.queue_job(&job_id, &job_name, queued_at)
- .await?;
+ {
+ error!("Fail to queue job {} due to {:?}", job_id, e);
+ return Ok(());
+ }
let state = self.state.clone();
tokio::spawn(async move {
- let event =
- match state.plan_job(&job_id, session_ctx.clone(), &plan).await {
- Ok(plan) => QueryStageSchedulerEvent::JobSubmitted {
- job_id,
- job_name,
- session_id: session_ctx.session_id(),
- queued_at,
- submitted_at: timestamp_millis(),
- resubmit: false,
- plan,
- },
- Err(error) => {
- let fail_message =
- format!("Error planning job {job_id}: {error:?}");
- error!("{}", &fail_message);
- QueryStageSchedulerEvent::JobPlanningFailed {
- job_id,
- fail_message,
- queued_at,
- failed_at: timestamp_millis(),
- }
- }
- };
- if let Err(e) = tx_event.post_event(event).await {
+ let event = if let Err(e) = state
+ .submit_job(&job_id, &job_name, session_ctx, &plan, queued_at)
+ .await
+ {
+ let fail_message = format!("Error planning job {job_id}: {e:?}");
+ error!("{}", &fail_message);
+ QueryStageSchedulerEvent::JobPlanningFailed {
+ job_id,
+ fail_message,
+ queued_at,
+ failed_at: timestamp_millis(),
+ }
+ } else {
+ QueryStageSchedulerEvent::JobSubmitted {
+ job_id,
+ queued_at,
+ submitted_at: timestamp_millis(),
+ }
+ };
+ if let Err(e) = event_sender.post_event(event).await {
error!("Fail to send event due to {}", e);
}
});
}
QueryStageSchedulerEvent::JobSubmitted {
job_id,
- job_name,
- session_id,
queued_at,
submitted_at,
- resubmit,
- plan,
} => {
- if !resubmit {
- self.metrics_collector.record_submitted(
- &job_id,
- queued_at,
- submitted_at,
- );
- self.state
- .task_manager
- .submit_job(
- job_id.as_str(),
- job_name.as_str(),
- session_id.as_str(),
- plan.clone(),
- queued_at,
- )
- .await?;
- info!("Job {} submitted", job_id);
- } else {
- debug!("Job {} resubmitted", job_id);
- }
+ self.metrics_collector
+ .record_submitted(&job_id, queued_at, submitted_at);
+
+ info!("Job {} submitted", job_id);
if self.state.config.is_push_staged_scheduling() {
- let available_tasks = self
- .state
- .task_manager
- .get_available_task_count(&job_id)
+ event_sender
+ .post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
-
- let reservations: Vec<ExecutorReservation> = self
- .state
- .executor_manager
- .reserve_slots(available_tasks as u32)
- .await?
- .into_iter()
- .map(|res| res.assign(job_id.clone()))
- .collect();
-
- if reservations.is_empty()
- && self.config.job_resubmit_interval_ms.is_some()
- {
- let wait_ms = self.config.job_resubmit_interval_ms.unwrap();
-
- debug!(
- "No task slots reserved for job {job_id}, resubmitting after {wait_ms}ms"
- );
-
- tokio::task::spawn(async move {
- tokio::time::sleep(Duration::from_millis(wait_ms)).await;
-
- if let Err(e) = tx_event
- .post_event(QueryStageSchedulerEvent::JobSubmitted {
- job_id,
- job_name,
- session_id,
- queued_at,
- submitted_at,
- resubmit: true,
- plan: plan.clone(),
- })
- .await
- {
- error!("error resubmitting job: {}", e);
- }
- });
- } else {
- debug!(
- "Reserved {} task slots for submitted job {}",
- reservations.len(),
- job_id
- );
-
- tx_event
- .post_event(QueryStageSchedulerEvent::ReservationOffering(
- reservations,
- ))
- .await?;
- }
}
}
QueryStageSchedulerEvent::JobPlanningFailed {
@@ -243,10 +157,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.record_failed(&job_id, queued_at, failed_at);
error!("Job {} failed: {}", job_id, fail_message);
- self.state
+ if let Err(e) = self
+ .state
.task_manager
.fail_unscheduled_job(&job_id, fail_message)
- .await?;
+ .await
+ {
+ error!(
+ "Fail to invoke fail_unscheduled_job for job {} due to {:?}",
+ job_id, e
+ );
+ }
}
QueryStageSchedulerEvent::JobFinished {
job_id,
@@ -257,7 +178,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.record_completed(&job_id, queued_at, completed_at);
info!("Job {} success", job_id);
- self.state.task_manager.succeed_job(&job_id).await?;
+ if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
+ error!(
+ "Fail to invoke succeed_job for job {} due to {:?}",
+ job_id, e
+ );
+ }
self.state.clean_up_successful_job(job_id);
}
QueryStageSchedulerEvent::JobRunningFailed {
@@ -270,34 +196,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.record_failed(&job_id, queued_at, failed_at);
error!("Job {} running failed", job_id);
- let (running_tasks, _pending_tasks) = self
+ match self
.state
.task_manager
.abort_job(&job_id, fail_message)
- .await?;
-
- if !running_tasks.is_empty() {
- tx_event
- .post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
- .await?;
+ .await
+ {
+ Ok((running_tasks, _pending_tasks)) => {
+ if !running_tasks.is_empty() {
+ event_sender
+ .post_event(QueryStageSchedulerEvent::CancelTasks(
+ running_tasks,
+ ))
+ .await?;
+ }
+ }
+ Err(e) => {
+ error!(
+ "Fail to invoke abort_job for job {} due to {:?}",
+ job_id, e
+ );
+ }
}
self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
info!("Job {} Updated", job_id);
- self.state.task_manager.update_job(&job_id).await?;
+ if let Err(e) = self.state.task_manager.update_job(&job_id).await {
+ error!(
+ "Fail to invoke update_job for job {} due to {:?}",
+ job_id, e
+ );
+ }
}
QueryStageSchedulerEvent::JobCancel(job_id) => {
self.metrics_collector.record_cancelled(&job_id);
info!("Job {} Cancelled", job_id);
- let (running_tasks, _pending_tasks) =
- self.state.task_manager.cancel_job(&job_id).await?;
+ match self.state.task_manager.cancel_job(&job_id).await {
+ Ok((running_tasks, _pending_tasks)) => {
+ event_sender
+ .post_event(QueryStageSchedulerEvent::CancelTasks(
+ running_tasks,
+ ))
+ .await?;
+ }
+ Err(e) => {
+ error!(
+ "Fail to invoke cancel_job for job {} due to {:?}",
+ job_id, e
+ );
+ }
+ }
self.state.clean_up_failed_job(job_id);
-
- tx_event
- .post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
- .await?;
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
debug!(
@@ -306,22 +257,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
);
let num_status = tasks_status.len();
+ if self.state.config.is_push_staged_scheduling() {
+ self.state
+ .executor_manager
+ .unbind_tasks(vec![(executor_id.clone(), num_status as u32)])
+ .await?;
+ }
match self
.state
.update_task_statuses(&executor_id, tasks_status)
.await
{
- Ok((stage_events, offers)) => {
+ Ok(stage_events) => {
if self.state.config.is_push_staged_scheduling() {
- tx_event
- .post_event(
- QueryStageSchedulerEvent::ReservationOffering(offers),
- )
+ event_sender
+ .post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}
for stage_event in stage_events {
- tx_event.post_event(stage_event).await?;
+ event_sender.post_event(stage_event).await?;
}
}
Err(e) => {
@@ -333,27 +288,21 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}
}
- QueryStageSchedulerEvent::ReservationOffering(reservations) => {
- let (reservations, pending) =
- self.state.offer_reservation(reservations).await?;
-
- self.set_pending_tasks(pending);
-
- if !reservations.is_empty() {
- tx_event
- .post_event(QueryStageSchedulerEvent::ReservationOffering(
- reservations,
- ))
- .await?;
- }
+ QueryStageSchedulerEvent::ReviveOffers => {
+ self.state.revive_offers(event_sender).await?;
}
QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => {
match self.state.task_manager.executor_lost(&executor_id).await {
Ok(tasks) => {
if !tasks.is_empty() {
- tx_event
- .post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
- .await?;
+ if let Err(e) = self
+ .state
+ .executor_manager
+ .cancel_running_tasks(tasks)
+ .await
+ {
+ warn!("Fail to cancel running tasks due to {:?}", e);
+ }
}
}
Err(e) => {
@@ -365,10 +314,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}
QueryStageSchedulerEvent::CancelTasks(tasks) => {
- self.state
+ if let Err(e) = self
+ .state
.executor_manager
.cancel_running_tasks(tasks)
- .await?;
+ .await
+ {
+ warn!("Fail to cancel running tasks due to {:?}", e);
+ }
}
QueryStageSchedulerEvent::JobDataClean(job_id) => {
self.state.executor_manager.clean_up_job_data(job_id);
@@ -397,75 +350,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
#[cfg(test)]
mod tests {
use crate::config::SchedulerConfig;
- use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::test_utils::{await_condition, SchedulerTest, TestMetricsCollector};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
- use ballista_core::event_loop::EventAction;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
- use datafusion::physical_plan::empty::EmptyExec;
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
use tracing_subscriber::EnvFilter;
#[tokio::test]
- async fn test_job_resubmit() -> Result<()> {
- let plan = test_plan(10);
-
- let metrics_collector = Arc::new(TestMetricsCollector::default());
-
- // Set resubmit interval of 1ms
- let mut test = SchedulerTest::new(
- SchedulerConfig::default()
- .with_job_resubmit_interval_ms(1)
- .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
- metrics_collector.clone(),
- 0,
- 0,
- None,
- )
- .await?;
-
- test.submit("job-id", "job-name", &plan).await?;
-
- let query_stage_scheduler = test.query_stage_scheduler();
-
- let (tx, mut rx) = tokio::sync::mpsc::channel::<QueryStageSchedulerEvent>(10);
-
- let event = QueryStageSchedulerEvent::JobSubmitted {
- job_id: "job-id".to_string(),
- job_name: "job-name".to_string(),
- session_id: "session-id".to_string(),
- queued_at: 0,
- submitted_at: 0,
- resubmit: false,
- plan: Arc::new(EmptyExec::new(false, Arc::new(test_schema()))),
- };
-
- // Mock the JobQueued work.
- query_stage_scheduler
- .state
- .task_manager
- .queue_job("job-id", "job-name", 0)
- .await?;
-
- query_stage_scheduler.on_receive(event, &tx, &rx).await?;
-
- let next_event = rx.recv().await.unwrap();
-
- dbg!(next_event.clone());
- assert!(matches!(
- next_event,
- QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if job_id == "job-id" && resubmit
- ));
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_pending_task_metric() -> Result<()> {
+ async fn test_pending_job_metric() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
@@ -484,92 +380,53 @@ mod tests {
)
.await?;
- test.submit("job-1", "", &plan).await?;
-
- // First stage has 10 tasks, one of which should be scheduled immediately
- expect_pending_tasks(&test, 9).await;
-
- test.tick().await?;
-
- // First task completes and another should be scheduler, so we should have 8
- expect_pending_tasks(&test, 8).await;
-
- // Complete the 8 remaining tasks in the first stage
- for _ in 0..8 {
- test.tick().await?;
- }
-
- // The second stage should be resolved so we should have a new pending task
- expect_pending_tasks(&test, 1).await;
-
- // complete the final task
- test.tick().await?;
-
- expect_pending_tasks(&test, 0).await;
-
- // complete the final task
- test.tick().await?;
-
- // Job should be finished now
- let _ = test.await_completion_timeout("job-1", 5_000).await?;
-
- Ok(())
- }
-
- #[ignore]
- #[tokio::test]
- async fn test_pending_task_metric_on_cancellation() -> Result<()> {
- let plan = test_plan(10);
-
- let metrics_collector = Arc::new(TestMetricsCollector::default());
-
- let mut test = SchedulerTest::new(
- SchedulerConfig::default()
- .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
- metrics_collector.clone(),
- 1,
- 1,
- None,
- )
- .await?;
-
- test.submit("job-1", "", &plan).await?;
+ let job_id = "job-1";
- // First stage has 10 tasks, one of which should be scheduled immediately
- expect_pending_tasks(&test, 9).await;
+ test.submit(job_id, "", &plan).await?;
test.tick().await?;
- // First task completes and another should be scheduler, so we should have 8
- expect_pending_tasks(&test, 8).await;
-
- test.cancel("job-1").await?;
+ let pending_jobs = test.pending_job_number();
+ let expected = 0usize;
+ assert_eq!(
+ expected, pending_jobs,
+ "Expected {} pending jobs but found {}",
+ expected, pending_jobs
+ );
- // First task completes and another should be scheduler, so we should have 8
- expect_pending_tasks(&test, 0).await;
+ let running_jobs = test.running_job_number();
+ let expected = 1usize;
+ assert_eq!(
+ expected, running_jobs,
+ "Expected {} running jobs but found {}",
+ expected, running_jobs
+ );
- Ok(())
- }
+ test.cancel(job_id).await?;
- async fn expect_pending_tasks(test: &SchedulerTest, expected: usize) {
- let success = await_condition(Duration::from_millis(500), 20, || {
- let pending_tasks = test.pending_tasks();
+ let expected = 0usize;
+ let success = await_condition(Duration::from_millis(10), 20, || {
+ let running_jobs = test.running_job_number();
- futures::future::ready(Ok(pending_tasks == expected))
+ futures::future::ready(Ok(running_jobs == expected))
})
.await
.unwrap();
-
assert!(
success,
- "Expected {} pending tasks but found {}",
+ "Expected {} running jobs but found {}",
expected,
- test.pending_tasks()
+ test.running_job_number()
);
+
+ Ok(())
}
fn test_plan(partitions: usize) -> LogicalPlan {
- let schema = test_schema();
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Utf8, false),
+ Field::new("gmv", DataType::UInt64, false),
+ ]);
scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
.unwrap()
@@ -578,11 +435,4 @@ mod tests {
.build()
.unwrap()
}
-
- fn test_schema() -> Schema {
- Schema::new(vec![
- Field::new("id", DataType::Utf8, false),
- Field::new("gmv", DataType::UInt64, false),
- ])
- }
}
diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs
index c00363d7..dde1268c 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -48,6 +48,7 @@ use crate::display::print_stage_metrics;
use crate::planner::DistributedPlanner;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::timestamp_millis;
+use crate::state::execution_graph::execution_stage::RunningStage;
pub(crate) use crate::state::execution_graph::execution_stage::{
ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, TaskInfo,
UnresolvedStage,
@@ -196,8 +197,8 @@ impl ExecutionGraph {
self.session_id.as_str()
}
- pub fn status(&self) -> JobStatus {
- self.status.clone()
+ pub fn status(&self) -> &JobStatus {
+ &self.status
}
pub fn start_time(&self) -> u64 {
@@ -930,6 +931,64 @@ impl ExecutionGraph {
Ok(next_task)
}
+ pub(crate) fn fetch_running_stage(
+ &mut self,
+ black_list: &[usize],
+ ) -> Option<(&mut RunningStage, &mut usize)> {
+ if matches!(
+ self.status,
+ JobStatus {
+ status: Some(job_status::Status::Failed(_)),
+ ..
+ }
+ ) {
+ warn!("Call fetch_runnable_stage on failed Job");
+ return None;
+ }
+
+ let running_stage_id = self.get_running_stage_id(black_list);
+ if let Some(running_stage_id) = running_stage_id {
+ if let Some(ExecutionStage::Running(running_stage)) =
+ self.stages.get_mut(&running_stage_id)
+ {
+ Some((running_stage, &mut self.task_id_gen))
+ } else {
+ warn!("Fail to find running stage with id {running_stage_id}");
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+ fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
+ let mut running_stage_id = self.stages.iter().find_map(|(stage_id, stage)| {
+ if black_list.contains(stage_id) {
+ None
+ } else if let ExecutionStage::Running(stage) = stage {
+ if stage.available_tasks() > 0 {
+ Some(*stage_id)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ });
+
+ // If no available tasks found in the running stage,
+ // try to find a resolved stage and convert it to the running stage
+ if running_stage_id.is_none() {
+ if self.revive() {
+ running_stage_id = self.get_running_stage_id(black_list);
+ } else {
+ running_stage_id = None;
+ }
+ }
+
+ running_stage_id
+ }
+
pub fn update_status(&mut self, status: JobStatus) {
self.status = status;
}
@@ -1430,6 +1489,22 @@ impl Debug for ExecutionGraph {
}
}
+pub(crate) fn create_task_info(executor_id: String, task_id: usize) -> TaskInfo {
+ TaskInfo {
+ task_id,
+ scheduled_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
+ // Those times will be updated when the task finish
+ launch_time: 0,
+ start_exec_time: 0,
+ end_exec_time: 0,
+ finish_time: 0,
+ task_status: task_status::Status::Running(RunningTask { executor_id }),
+ }
+}
+
/// Utility for building a set of `ExecutionStage`s from
/// a list of `ShuffleWriterExec`.
///
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index 7a6bf461..eed1dba0 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -17,18 +17,19 @@
use std::time::Duration;
-#[cfg(not(test))]
use ballista_core::error::BallistaError;
use ballista_core::error::Result;
use ballista_core::serde::protobuf;
-use crate::cluster::ClusterState;
+use crate::cluster::{BoundTask, ClusterState, ExecutorSlot};
use crate::config::SchedulerConfig;
use crate::state::execution_graph::RunningTaskInfo;
+use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
- executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
+ executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
+ RemoveJobDataParams, StopExecutorParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::utils::{create_grpc_client_connection, get_time_before};
@@ -40,42 +41,6 @@ use tonic::transport::Channel;
type ExecutorClients = Arc<DashMap<String, ExecutorGrpcClient<Channel>>>;
-/// Represents a task slot that is reserved (i.e. available for scheduling but not visible to the
-/// rest of the system).
-/// When tasks finish we want to preferentially assign new tasks from the same job, so the reservation
-/// can already be assigned to a particular job ID. In that case, the scheduler will try to schedule
-/// available tasks for that job to the reserved task slot.
-#[derive(Clone, Debug)]
-pub struct ExecutorReservation {
- pub executor_id: String,
- pub job_id: Option<String>,
-}
-
-impl ExecutorReservation {
- pub fn new_free(executor_id: String) -> Self {
- Self {
- executor_id,
- job_id: None,
- }
- }
-
- pub fn new_assigned(executor_id: String, job_id: String) -> Self {
- Self {
- executor_id,
- job_id: Some(job_id),
- }
- }
-
- pub fn assign(mut self, job_id: String) -> Self {
- self.job_id = Some(job_id);
- self
- }
-
- pub fn assigned(&self) -> bool {
- self.job_id.is_some()
- }
-}
-
#[derive(Clone)]
pub struct ExecutorManager {
cluster_state: Arc<dyn ClusterState>,
@@ -101,66 +66,76 @@ impl ExecutorManager {
Ok(())
}
- /// Reserve up to n executor task slots. Once reserved these slots will not be available
- /// for scheduling.
- /// This operation is atomic, so if this method return an Err, no slots have been reserved.
- pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
+ /// Bind the ready to running tasks from [`active_jobs`] with available executors.
+ ///
+ /// If `executors` is provided, only bind slots from the specified executor IDs
+ pub async fn bind_schedulable_tasks(
+ &self,
+ active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ ) -> Result<Vec<BoundTask>> {
+ if active_jobs.is_empty() {
+ warn!("There's no active jobs for binding tasks");
+ return Ok(vec![]);
+ }
let alive_executors = self.get_alive_executors();
-
- debug!("Alive executors: {alive_executors:?}");
-
+ if alive_executors.is_empty() {
+ warn!("There's no alive executors for binding tasks");
+ return Ok(vec![]);
+ }
self.cluster_state
- .reserve_slots(n, self.config.task_distribution, Some(alive_executors))
+ .bind_schedulable_tasks(
+ self.config.task_distribution,
+ active_jobs,
+ Some(alive_executors),
+ )
.await
}
/// Returned reserved task slots to the pool of available slots. This operation is atomic
/// so either the entire pool of reserved task slots it returned or none are.
- pub async fn cancel_reservations(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
- self.cluster_state.cancel_reservations(reservations).await
+ pub async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
+ self.cluster_state.unbind_tasks(executor_slots).await
}
/// Send rpc to Executors to cancel the running tasks
pub async fn cancel_running_tasks(&self, tasks: Vec<RunningTaskInfo>) -> Result<()> {
- let mut tasks_to_cancel: HashMap<&str, Vec<protobuf::RunningTaskInfo>> =
+ let mut tasks_to_cancel: HashMap<String, Vec<protobuf::RunningTaskInfo>> =
Default::default();
- for task_info in &tasks {
- if let Some(infos) = tasks_to_cancel.get_mut(task_info.executor_id.as_str()) {
- infos.push(protobuf::RunningTaskInfo {
- task_id: task_info.task_id as u32,
- job_id: task_info.job_id.clone(),
- stage_id: task_info.stage_id as u32,
- partition_id: task_info.partition_id as u32,
- })
- } else {
- tasks_to_cancel.insert(
- task_info.executor_id.as_str(),
- vec![protobuf::RunningTaskInfo {
- task_id: task_info.task_id as u32,
- job_id: task_info.job_id.clone(),
- stage_id: task_info.stage_id as u32,
- partition_id: task_info.partition_id as u32,
- }],
- );
- }
+ for task_info in tasks {
+ let infos = tasks_to_cancel
+ .entry(task_info.executor_id)
+ .or_insert_with(Vec::new);
+ infos.push(protobuf::RunningTaskInfo {
+ task_id: task_info.task_id as u32,
+ job_id: task_info.job_id,
+ stage_id: task_info.stage_id as u32,
+ partition_id: task_info.partition_id as u32,
+ });
}
- for (executor_id, infos) in tasks_to_cancel {
- if let Ok(mut client) = self.get_client(executor_id).await {
- client
- .cancel_tasks(CancelTasksParams { task_infos: infos })
- .await?;
- } else {
- error!(
- "Failed to get client for executor ID {} to cancel tasks",
- executor_id
- )
+ let executor_manager = self.clone();
+ tokio::spawn(async move {
+ for (executor_id, infos) in tasks_to_cancel {
+ if let Ok(mut client) = executor_manager.get_client(&executor_id).await {
+ if let Err(e) = client
+ .cancel_tasks(CancelTasksParams { task_infos: infos })
+ .await
+ {
+ error!(
+ "Fail to cancel tasks for executor ID {} due to {:?}",
+ executor_id, e
+ );
+ }
+ } else {
+ error!(
+ "Failed to get client for executor ID {} to cancel tasks",
+ executor_id
+ )
+ }
}
- }
+ });
+
Ok(())
}
@@ -218,30 +193,6 @@ impl ExecutorManager {
}
}
- pub async fn get_client(
- &self,
- executor_id: &str,
- ) -> Result<ExecutorGrpcClient<Channel>> {
- let client = self.clients.get(executor_id).map(|value| value.clone());
-
- if let Some(client) = client {
- Ok(client)
- } else {
- let executor_metadata = self.get_executor_metadata(executor_id).await?;
- let executor_url = format!(
- "http://{}:{}",
- executor_metadata.host, executor_metadata.grpc_port
- );
- let connection = create_grpc_client_connection(executor_url).await?;
- let client = ExecutorGrpcClient::new(connection);
-
- {
- self.clients.insert(executor_id.to_owned(), client.clone());
- }
- Ok(client)
- }
- }
-
/// Get a list of all executors along with the timestamp of their last recorded heartbeat
pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, Duration)>> {
let heartbeat_timestamps: Vec<(String, u64)> = self
@@ -263,6 +214,7 @@ impl ExecutorManager {
Ok(state)
}
+ /// Get executor metadata for the provided executor ID. Returns an error if the executor does not exist
pub async fn get_executor_metadata(
&self,
executor_id: &str,
@@ -270,56 +222,38 @@ impl ExecutorManager {
self.cluster_state.get_executor_metadata(executor_id).await
}
+ /// It's only used for pull-based task scheduling.
+ ///
+ /// For push-based one, we should use [`register_executor`], instead.
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
self.cluster_state.save_executor_metadata(metadata).await
}
- /// Register the executor with the scheduler. This will save the executor metadata and the
- /// executor data to persistent state.
+ /// Register the executor with the scheduler.
///
- /// If `reserve` is true, then any available task slots will be reserved and dispatched for scheduling.
- /// If `reserve` is false, then the executor data will be saved as is.
+ /// This will save the executor metadata and the executor data to persistent state.
///
- /// In general, reserve should be true is the scheduler is using push-based scheduling and false
- /// if the scheduler is using pull-based scheduling.
+ /// It's only used for push-based task scheduling
pub async fn register_executor(
&self,
metadata: ExecutorMetadata,
specification: ExecutorData,
- reserve: bool,
- ) -> Result<Vec<ExecutorReservation>> {
+ ) -> Result<()> {
debug!(
"registering executor {} with {} task slots",
metadata.id, specification.total_task_slots
);
- self.test_scheduler_connectivity(&metadata).await?;
+ ExecutorManager::test_connectivity(&metadata).await?;
- if !reserve {
- self.cluster_state
- .register_executor(metadata, specification.clone(), reserve)
- .await?;
-
- Ok(vec![])
- } else {
- let mut specification = specification;
- let num_slots = specification.available_task_slots as usize;
- let mut reservations: Vec<ExecutorReservation> = vec![];
- for _ in 0..num_slots {
- reservations.push(ExecutorReservation::new_free(metadata.id.clone()));
- }
-
- specification.available_task_slots = 0;
-
- self.cluster_state
- .register_executor(metadata, specification, reserve)
- .await?;
+ self.cluster_state
+ .register_executor(metadata, specification)
+ .await?;
- Ok(reservations)
- }
+ Ok(())
}
- /// Remove the executor within the scheduler.
+ /// Remove the executor from the cluster
pub async fn remove_executor(
&self,
executor_id: &str,
@@ -329,29 +263,55 @@ impl ExecutorManager {
self.cluster_state.remove_executor(executor_id).await
}
- #[cfg(not(test))]
- async fn test_scheduler_connectivity(
+ pub async fn stop_executor(&self, executor_id: &str, stop_reason: String) {
+ let executor_id = executor_id.to_string();
+ match self.get_client(&executor_id).await {
+ Ok(mut client) => {
+ tokio::task::spawn(async move {
+ match client
+ .stop_executor(StopExecutorParams {
+ executor_id: executor_id.to_string(),
+ reason: stop_reason,
+ force: true,
+ })
+ .await
+ {
+ Err(error) => {
+ warn!("Failed to send stop_executor rpc due to, {}", error);
+ }
+ Ok(_value) => {}
+ }
+ });
+ }
+ Err(_) => {
+ warn!(
+ "Executor is already dead, failed to connect to Executor {}",
+ executor_id
+ );
+ }
+ }
+ }
+
+ pub async fn launch_multi_task(
&self,
- metadata: &ExecutorMetadata,
+ executor_id: &str,
+ multi_tasks: Vec<MultiTaskDefinition>,
+ scheduler_id: String,
) -> Result<()> {
- let executor_url = format!("http://{}:{}", metadata.host, metadata.grpc_port);
- debug!("Connecting to executor {:?}", executor_url);
- let _ = protobuf::executor_grpc_client::ExecutorGrpcClient::connect(executor_url)
+ let mut client = self.get_client(executor_id).await?;
+ client
+ .launch_multi_task(protobuf::LaunchMultiTaskParams {
+ multi_tasks,
+ scheduler_id,
+ })
.await
.map_err(|e| {
BallistaError::Internal(format!(
- "Failed to register executor at {}:{}, could not connect: {:?}",
- metadata.host, metadata.grpc_port, e
+ "Failed to connect to executor {}: {:?}",
+ executor_id, e
))
})?;
- Ok(())
- }
- #[cfg(test)]
- async fn test_scheduler_connectivity(
- &self,
- _metadata: &ExecutorMetadata,
- ) -> Result<()> {
Ok(())
}
@@ -433,289 +393,45 @@ impl ExecutorManager {
})
.collect::<Vec<_>>()
}
-}
-
-#[cfg(test)]
-mod test {
- use crate::config::{SchedulerConfig, TaskDistribution};
- use std::sync::Arc;
-
- use crate::scheduler_server::timestamp_secs;
- use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
- use crate::test_utils::test_cluster_context;
- use ballista_core::error::Result;
- use ballista_core::serde::protobuf::executor_status::Status;
- use ballista_core::serde::protobuf::{ExecutorHeartbeat, ExecutorStatus};
- use ballista_core::serde::scheduler::{
- ExecutorData, ExecutorMetadata, ExecutorSpecification,
- };
-
- #[tokio::test]
- async fn test_reserve_and_cancel() -> Result<()> {
- test_reserve_and_cancel_inner(TaskDistribution::Bias).await?;
- test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?;
-
- Ok(())
- }
-
- async fn test_reserve_and_cancel_inner(
- task_distribution: TaskDistribution,
- ) -> Result<()> {
- let cluster = test_cluster_context();
-
- let config = SchedulerConfig::default().with_task_distribution(task_distribution);
- let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
- let executors = test_executors(10, 4);
-
- for (executor_metadata, executor_data) in executors {
- executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
-
- // Reserve all the slots
- let reservations = executor_manager.reserve_slots(40).await?;
-
- assert_eq!(
- reservations.len(),
- 40,
- "Expected 40 reservations for policy {task_distribution:?}"
- );
-
- // Now cancel them
- executor_manager.cancel_reservations(reservations).await?;
-
- // Now reserve again
- let reservations = executor_manager.reserve_slots(40).await?;
-
- assert_eq!(
- reservations.len(),
- 40,
- "Expected 40 reservations for policy {task_distribution:?}"
- );
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_reserve_partial() -> Result<()> {
- test_reserve_partial_inner(TaskDistribution::Bias).await?;
- test_reserve_partial_inner(TaskDistribution::RoundRobin).await?;
-
- Ok(())
- }
-
- async fn test_reserve_partial_inner(
- task_distribution: TaskDistribution,
- ) -> Result<()> {
- let cluster = test_cluster_context();
-
- let config = SchedulerConfig::default().with_task_distribution(task_distribution);
- let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
- let executors = test_executors(10, 4);
-
- for (executor_metadata, executor_data) in executors {
- executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
-
- // Reserve all the slots
- let reservations = executor_manager.reserve_slots(30).await?;
-
- assert_eq!(reservations.len(), 30);
-
- // Try to reserve 30 more. Only ten are available though so we should only get 10
- let more_reservations = executor_manager.reserve_slots(30).await?;
-
- assert_eq!(more_reservations.len(), 10);
-
- // Now cancel them
- executor_manager.cancel_reservations(reservations).await?;
- executor_manager
- .cancel_reservations(more_reservations)
- .await?;
-
- // Now reserve again
- let reservations = executor_manager.reserve_slots(40).await?;
-
- assert_eq!(reservations.len(), 40);
-
- let more_reservations = executor_manager.reserve_slots(30).await?;
-
- assert_eq!(more_reservations.len(), 0);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_reserve_concurrent() -> Result<()> {
- test_reserve_concurrent_inner(TaskDistribution::Bias).await?;
- test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?;
-
- Ok(())
- }
-
- async fn test_reserve_concurrent_inner(
- task_distribution: TaskDistribution,
- ) -> Result<()> {
- let (sender, mut receiver) =
- tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
- let executors = test_executors(10, 4);
-
- let config = SchedulerConfig::default().with_task_distribution(task_distribution);
- let cluster = test_cluster_context();
- let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
+ async fn get_client(&self, executor_id: &str) -> Result<ExecutorGrpcClient<Channel>> {
+ let client = self.clients.get(executor_id).map(|value| value.clone());
- for (executor_metadata, executor_data) in executors {
- executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
+ if let Some(client) = client {
+ Ok(client)
+ } else {
+ let executor_metadata = self.get_executor_metadata(executor_id).await?;
+ let executor_url = format!(
+ "http://{}:{}",
+ executor_metadata.host, executor_metadata.grpc_port
+ );
+ let connection = create_grpc_client_connection(executor_url).await?;
+ let client = ExecutorGrpcClient::new(connection);
- {
- let sender = sender;
- // Spawn 20 async tasks to each try and reserve all 40 slots
- for _ in 0..20 {
- let executor_manager = executor_manager.clone();
- let sender = sender.clone();
- tokio::task::spawn(async move {
- let reservations = executor_manager.reserve_slots(40).await;
- sender.send(reservations).await.unwrap();
- });
+ {
+ self.clients.insert(executor_id.to_owned(), client.clone());
}
+ Ok(client)
}
-
- let mut total_reservations: Vec<ExecutorReservation> = vec![];
-
- while let Some(Ok(reservations)) = receiver.recv().await {
- total_reservations.extend(reservations);
- }
-
- // The total number of reservations should never exceed the number of slots
- assert_eq!(total_reservations.len(), 40);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_register_reserve() -> Result<()> {
- test_register_reserve_inner(TaskDistribution::Bias).await?;
- test_register_reserve_inner(TaskDistribution::RoundRobin).await?;
-
- Ok(())
- }
-
- async fn test_register_reserve_inner(
- task_distribution: TaskDistribution,
- ) -> Result<()> {
- let cluster = test_cluster_context();
-
- let config = SchedulerConfig::default().with_task_distribution(task_distribution);
- let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
- let executors = test_executors(10, 4);
-
- for (executor_metadata, executor_data) in executors {
- let reservations = executor_manager
- .register_executor(executor_metadata, executor_data, true)
- .await?;
-
- assert_eq!(reservations.len(), 4);
- }
-
- // All slots should be reserved
- let reservations = executor_manager.reserve_slots(1).await?;
-
- assert_eq!(reservations.len(), 0);
-
- Ok(())
}
- #[tokio::test]
- async fn test_ignore_fenced_executors() -> Result<()> {
- test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?;
- test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?;
-
+ #[cfg(not(test))]
+ async fn test_connectivity(metadata: &ExecutorMetadata) -> Result<()> {
+ let executor_url = format!("http://{}:{}", metadata.host, metadata.grpc_port);
+ debug!("Connecting to executor {:?}", executor_url);
+ let _ = protobuf::executor_grpc_client::ExecutorGrpcClient::connect(executor_url)
+ .await
+ .map_err(|e| {
+ BallistaError::Internal(format!(
+ "Failed to register executor at {}:{}, could not connect: {:?}",
+ metadata.host, metadata.grpc_port, e
+ ))
+ })?;
Ok(())
}
- async fn test_ignore_fenced_executors_inner(
- task_distribution: TaskDistribution,
- ) -> Result<()> {
- let cluster = test_cluster_context();
-
- let config = SchedulerConfig::default().with_task_distribution(task_distribution);
- let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
-
- // Setup two executors initially
- let executors = test_executors(2, 4);
-
- for (executor_metadata, executor_data) in executors {
- let _ = executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
-
- // Fence one of the executors
- executor_manager
- .save_executor_heartbeat(ExecutorHeartbeat {
- executor_id: "executor-0".to_string(),
- timestamp: timestamp_secs(),
- metrics: vec![],
- status: Some(ExecutorStatus {
- status: Some(Status::Terminating(String::default())),
- }),
- })
- .await?;
-
- let reservations = executor_manager.reserve_slots(8).await?;
-
- assert_eq!(reservations.len(), 4, "Expected only four reservations");
-
- assert!(
- reservations
- .iter()
- .all(|res| res.executor_id == "executor-1"),
- "Expected all reservations from non-fenced executor",
- );
-
+ #[cfg(test)]
+ async fn test_connectivity(_metadata: &ExecutorMetadata) -> Result<()> {
Ok(())
}
-
- fn test_executors(
- total_executors: usize,
- slots_per_executor: u32,
- ) -> Vec<(ExecutorMetadata, ExecutorData)> {
- let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
- for i in 0..total_executors {
- result.push((
- ExecutorMetadata {
- id: format!("executor-{i}"),
- host: format!("host-{i}"),
- port: 8080,
- grpc_port: 9090,
- specification: ExecutorSpecification {
- task_slots: slots_per_executor,
- },
- },
- ExecutorData {
- executor_id: format!("executor-{i}"),
- total_task_slots: slots_per_executor,
- available_task_slots: slots_per_executor,
- },
- ));
- }
-
- result
- }
}
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 2ad2d707..957bbcfe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -26,23 +26,23 @@ use std::time::Instant;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::{TaskLauncher, TaskManager};
-use crate::cluster::BallistaCluster;
+use crate::cluster::{BallistaCluster, BoundTask, ExecutorSlot};
use crate::config::SchedulerConfig;
use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
+use ballista_core::event_loop::EventSender;
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
use prost::Message;
pub mod execution_graph;
@@ -155,139 +155,105 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
self.executor_manager.init().await
}
- pub(crate) async fn update_task_statuses(
+ pub(crate) async fn revive_offers(
&self,
- executor_id: &str,
- tasks_status: Vec<TaskStatus>,
- ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
- let executor = self
+ sender: EventSender<QueryStageSchedulerEvent>,
+ ) -> Result<()> {
+ let schedulable_tasks = self
.executor_manager
- .get_executor_metadata(executor_id)
+ .bind_schedulable_tasks(self.task_manager.get_running_job_cache())
.await?;
+ if schedulable_tasks.is_empty() {
+ warn!("No schedulable tasks found to be launched");
+ return Ok(());
+ }
- let total_num_tasks = tasks_status.len();
- let reservations = (0..total_num_tasks)
- .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
- .collect();
-
- let events = self
- .task_manager
- .update_task_statuses(&executor, tasks_status)
- .await?;
-
- Ok((events, reservations))
- }
-
- /// Process reservations which are offered. The basic process is
- /// 1. Attempt to fill the offered reservations with available tasks
- /// 2. For any reservation that filled, launch the assigned task on the executor.
- /// 3. For any reservations that could not be filled, cancel the reservation (i.e. return the
- /// task slot back to the pool of available task slots).
- ///
- /// NOTE Error handling in this method is very important. No matter what we need to ensure
- /// that unfilled reservations are cancelled or else they could become permanently "invisible"
- /// to the scheduler.
- pub(crate) async fn offer_reservation(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<(Vec<ExecutorReservation>, usize)> {
- let pending_tasks = match self.task_manager.fill_reservations(&reservations).await
- {
- Ok((assignments, unassigned_reservations, pending_tasks)) => {
- let executor_stage_assignments = Self::combine_task(assignments);
-
- self.spawn_tasks_and_persist_reservations_back(
- executor_stage_assignments,
- unassigned_reservations,
- );
-
- pending_tasks
+ let state = self.clone();
+ tokio::spawn(async move {
+ let mut if_revive = false;
+ match state.launch_tasks(schedulable_tasks).await {
+ Ok(unassigned_executor_slots) => {
+ if !unassigned_executor_slots.is_empty() {
+ if let Err(e) = state
+ .executor_manager
+ .unbind_tasks(unassigned_executor_slots)
+ .await
+ {
+ error!("Fail to unbind tasks: {}", e);
+ }
+ if_revive = true;
+ }
+ }
+ Err(e) => {
+ error!("Fail to launch tasks: {}", e);
+ if_revive = true;
+ }
}
- // If error set all reservations back
- Err(e) => {
- error!("Error filling reservations: {:?}", e);
- self.executor_manager
- .cancel_reservations(reservations)
- .await?;
- 0
+ if if_revive {
+ if let Err(e) = sender
+ .post_event(QueryStageSchedulerEvent::ReviveOffers)
+ .await
+ {
+ error!("Fail to send revive offers event due to {:?}", e);
+ }
}
- };
-
- let mut new_reservations = vec![];
-
- if pending_tasks > 0 {
- // If there are pending tasks available, try and schedule them
- let pending_reservations = self
- .executor_manager
- .reserve_slots(pending_tasks as u32)
- .await?;
- new_reservations.extend(pending_reservations);
- }
+ });
- Ok((new_reservations, pending_tasks))
+ Ok(())
}
- fn spawn_tasks_and_persist_reservations_back(
+ /// Remove an executor.
+ /// 1. The executor related info will be removed from [`ExecutorManager`]
+ /// 2. All of affected running execution graph will be rolled backed
+ /// 3. All of the running tasks of the affected running stages will be cancelled
+ pub(crate) async fn remove_executor(
&self,
- executor_stage_assignments: HashMap<
- String,
- HashMap<(String, usize), Vec<TaskDescription>>,
- >,
- mut unassigned_reservations: Vec<ExecutorReservation>,
+ executor_id: &str,
+ reason: Option<String>,
) {
- let task_manager = self.task_manager.clone();
- let executor_manager = self.executor_manager.clone();
-
- tokio::spawn(async move {
- for (executor_id, tasks) in executor_stage_assignments.into_iter() {
- let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
- // Total number of tasks to be launched for one executor
- let n_tasks: usize =
- tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+ if let Err(e) = self
+ .executor_manager
+ .remove_executor(executor_id, reason)
+ .await
+ {
+ warn!("Fail to remove executor {}: {}", executor_id, e);
+ }
- match executor_manager.get_executor_metadata(&executor_id).await {
- Ok(executor) => {
- if let Err(e) = task_manager
- .launch_multi_task(&executor, tasks, &executor_manager)
- .await
- {
- error!("Failed to launch new task: {:?}", e);
- // set resource back.
- unassigned_reservations.append(&mut vec![
- ExecutorReservation::new_free(
- executor_id.clone(),
- );
- n_tasks
- ]);
- }
+ match self.task_manager.executor_lost(executor_id).await {
+ Ok(tasks) => {
+ if !tasks.is_empty() {
+ if let Err(e) =
+ self.executor_manager.cancel_running_tasks(tasks).await
+ {
+ warn!("Fail to cancel running tasks due to {:?}", e);
}
- Err(e) => {
- error!("Failed to launch new task, could not get executor metadata: {:?}", e);
- // here no need set resource back.
- }
- };
+ }
}
- if !unassigned_reservations.is_empty() {
- // If any reserved slots remain, return them to the pool
- executor_manager
- .cancel_reservations(unassigned_reservations)
- .await
- .expect("cancel_reservations fail!");
+ Err(e) => {
+ error!(
+ "TaskManager error to handle Executor {} lost: {}",
+ executor_id, e
+ );
}
- });
+ }
}
- // Put tasks to the same executor together
- // And put tasks belonging to the same stage together for creating MultiTaskDefinition
- // return a map of <executor_id, <stage_key, TaskDesc>>.
- fn combine_task(
- assignments: Vec<(String, TaskDescription)>,
- ) -> HashMap<String, HashMap<(String, usize), Vec<TaskDescription>>> {
+ /// Given a vector of bound tasks,
+ /// 1. Firstly reorganize according to: executor -> job stage -> tasks;
+ /// 2. Then launch the task set vector to each executor one by one.
+ ///
+ /// If it fails to launch a task set, the related [`ExecutorSlot`] will be returned.
+ async fn launch_tasks(
+ &self,
+ bound_tasks: Vec<BoundTask>,
+ ) -> Result<Vec<ExecutorSlot>> {
+ // Put tasks to the same executor together
+ // And put tasks belonging to the same stage together for creating MultiTaskDefinition
let mut executor_stage_assignments: HashMap<
String,
HashMap<(String, usize), Vec<TaskDescription>>,
> = HashMap::new();
- for (executor_id, task) in assignments.into_iter() {
+ for (executor_id, task) in bound_tasks.into_iter() {
let stage_key = (task.partition.job_id.clone(), task.partition.stage_id);
if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) {
if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
@@ -304,15 +270,90 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
executor_stage_assignments.insert(executor_id, executor_stage_tasks);
}
}
- executor_stage_assignments
+
+ let mut join_handles = vec![];
+ for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+ let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+ // Total number of tasks to be launched for one executor
+ let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
+ let state = self.clone();
+ let join_handle = tokio::spawn(async move {
+ let success = match state
+ .executor_manager
+ .get_executor_metadata(&executor_id)
+ .await
+ {
+ Ok(executor) => {
+ if let Err(e) = state
+ .task_manager
+ .launch_multi_task(&executor, tasks, &state.executor_manager)
+ .await
+ {
+ let err_msg = format!("Failed to launch new task: {e}");
+ error!("{}", err_msg.clone());
+
+ // It's OK to remove executor aggressively,
+ // since if the executor is in healthy state, it will be registered again.
+ state.remove_executor(&executor_id, Some(err_msg)).await;
+
+ false
+ } else {
+ true
+ }
+ }
+ Err(e) => {
+ error!("Failed to launch new task, could not get executor metadata: {}", e);
+ false
+ }
+ };
+ if success {
+ vec![]
+ } else {
+ vec![(executor_id.clone(), n_tasks as u32)]
+ }
+ });
+ join_handles.push(join_handle);
+ }
+
+ let unassigned_executor_slots =
+ futures::future::join_all(join_handles)
+ .await
+ .into_iter()
+ .collect::<std::result::Result<
+ Vec<Vec<ExecutorSlot>>,
+ tokio::task::JoinError,
+ >>()?;
+
+ Ok(unassigned_executor_slots
+ .into_iter()
+ .flatten()
+ .collect::<Vec<ExecutorSlot>>())
}
- pub(crate) async fn plan_job(
+ pub(crate) async fn update_task_statuses(
+ &self,
+ executor_id: &str,
+ tasks_status: Vec<TaskStatus>,
+ ) -> Result<Vec<QueryStageSchedulerEvent>> {
+ let executor = self
+ .executor_manager
+ .get_executor_metadata(executor_id)
+ .await?;
+
+ self.task_manager
+ .update_task_statuses(&executor, tasks_status)
+ .await
+ }
+
+ pub(crate) async fn submit_job(
&self,
job_id: &str,
+ job_name: &str,
session_ctx: Arc<SessionContext>,
plan: &LogicalPlan,
- ) -> Result<Arc<dyn ExecutionPlan>> {
+ queued_at: u64,
+ ) -> Result<()> {
let start = Instant::now();
if log::max_level() >= log::Level::Debug {
@@ -349,10 +390,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
.map_err(|e| {
DataFusionError::External(
format!(
- "logical plan refers to path on local file system \
+ "logical plan refers to path on local file system \
that is not accessible in the scheduler: {url}: {e:?}"
- )
- .into(),
+ )
+ .into(),
)
})?;
}
@@ -367,11 +408,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
DisplayableExecutionPlan::new(plan.as_ref()).indent()
);
+ self.task_manager
+ .submit_job(job_id, job_name, &session_ctx.session_id(), plan, queued_at)
+ .await?;
+
let elapsed = start.elapsed();
info!("Planned job {} in {:?}", job_id, elapsed);
- Ok(plan)
+ Ok(())
}
/// Spawn a delayed future to clean up job data on both Scheduler and Executors
@@ -395,320 +440,3 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
);
}
}
-
-#[cfg(test)]
-mod test {
-
- use crate::state::SchedulerState;
- use ballista_core::config::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
- use ballista_core::error::Result;
- use ballista_core::serde::protobuf::{
- task_status, ShuffleWritePartition, SuccessfulTask, TaskStatus,
- };
- use ballista_core::serde::scheduler::{
- ExecutorData, ExecutorMetadata, ExecutorSpecification,
- };
- use ballista_core::serde::BallistaCodec;
-
- use crate::config::SchedulerConfig;
-
- use crate::scheduler_server::timestamp_millis;
- use crate::test_utils::{test_cluster_context, BlackholeTaskLauncher};
- use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::logical_expr::{col, sum};
- use datafusion::physical_plan::ExecutionPlan;
- use datafusion::prelude::SessionContext;
- use datafusion::test_util::scan_empty;
- use datafusion_proto::protobuf::LogicalPlanNode;
- use datafusion_proto::protobuf::PhysicalPlanNode;
- use std::sync::Arc;
-
- const TEST_SCHEDULER_NAME: &str = "localhost:50050";
-
- // We should free any reservations which are not assigned
- #[tokio::test]
- async fn test_offer_free_reservations() -> Result<()> {
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
- test_cluster_context(),
- BallistaCodec::default(),
- ));
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- let reservations = state
- .executor_manager
- .register_executor(executor_metadata, executor_data, true)
- .await?;
-
- let (result, assigned) = state.offer_reservation(reservations).await?;
-
- assert_eq!(assigned, 0);
- assert!(result.is_empty());
-
- // Need sleep wait for the spawn task work done.
- tokio::time::sleep(std::time::Duration::from_secs(1)).await;
- // All reservations should have been cancelled so we should be able to reserve them now
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 4);
-
- Ok(())
- }
-
- // We should fill unbound reservations to any available task
- #[tokio::test]
- async fn test_offer_fill_reservations() -> Result<()> {
- let config = BallistaConfig::builder()
- .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
- .build()?;
-
- let scheduler_config = SchedulerConfig::default();
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_task_launcher(
- test_cluster_context(),
- BallistaCodec::default(),
- TEST_SCHEDULER_NAME.into(),
- Arc::new(scheduler_config),
- Arc::new(BlackholeTaskLauncher::default()),
- ));
-
- let session_ctx = state.session_manager.create_session(&config).await?;
-
- let plan = test_graph(session_ctx.clone()).await;
-
- // Create 4 jobs so we have four pending tasks
- state
- .task_manager
- .queue_job("job-1", "", timestamp_millis())
- .await?;
- state
- .task_manager
- .submit_job(
- "job-1",
- "",
- session_ctx.session_id().as_str(),
- plan.clone(),
- 0,
- )
- .await?;
- state
- .task_manager
- .queue_job("job-2", "", timestamp_millis())
- .await?;
- state
- .task_manager
- .submit_job(
- "job-2",
- "",
- session_ctx.session_id().as_str(),
- plan.clone(),
- 0,
- )
- .await?;
- state
- .task_manager
- .queue_job("job-3", "", timestamp_millis())
- .await?;
- state
- .task_manager
- .submit_job(
- "job-3",
- "",
- session_ctx.session_id().as_str(),
- plan.clone(),
- 0,
- )
- .await?;
- state
- .task_manager
- .queue_job("job-4", "", timestamp_millis())
- .await?;
- state
- .task_manager
- .submit_job(
- "job-4",
- "",
- session_ctx.session_id().as_str(),
- plan.clone(),
- 0,
- )
- .await?;
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- let reservations = state
- .executor_manager
- .register_executor(executor_metadata, executor_data, true)
- .await?;
-
- let (result, pending) = state.offer_reservation(reservations).await?;
-
- assert_eq!(pending, 0);
- assert!(result.is_empty());
-
- // All task slots should be assigned so we should not be able to reserve more tasks
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 0);
-
- Ok(())
- }
-
- // We should generate a new event for tasks that are still pending
- #[tokio::test]
- async fn test_offer_resubmit_pending() -> Result<()> {
- let config = BallistaConfig::builder()
- .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
- .build()?;
-
- let scheduler_config = SchedulerConfig::default();
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_task_launcher(
- test_cluster_context(),
- BallistaCodec::default(),
- TEST_SCHEDULER_NAME.into(),
- Arc::new(scheduler_config),
- Arc::new(BlackholeTaskLauncher::default()),
- ));
-
- let session_ctx = state.session_manager.create_session(&config).await?;
-
- let plan = test_graph(session_ctx.clone()).await;
-
- // Create a job
- state
- .task_manager
- .queue_job("job-1", "", timestamp_millis())
- .await?;
- state
- .task_manager
- .submit_job(
- "job-1",
- "",
- session_ctx.session_id().as_str(),
- plan.clone(),
- 0,
- )
- .await?;
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- // Complete the first stage. So we should now have 4 pending tasks for this job stage 2
- {
- let plan_graph = state
- .task_manager
- .get_active_execution_graph("job-1")
- .unwrap();
- let task_def = plan_graph
- .write()
- .await
- .pop_next_task(&executor_data.executor_id)?
- .unwrap();
- let mut partitions: Vec<ShuffleWritePartition> = vec![];
- for partition_id in 0..4 {
- partitions.push(ShuffleWritePartition {
- partition_id: partition_id as u64,
- path: "some/path".to_string(),
- num_batches: 1,
- num_rows: 1,
- num_bytes: 1,
- })
- }
- state
- .task_manager
- .update_task_statuses(
- &executor_metadata,
- vec![TaskStatus {
- task_id: task_def.task_id as u32,
- job_id: "job-1".to_string(),
- stage_id: task_def.partition.stage_id as u32,
- stage_attempt_num: task_def.stage_attempt_num as u32,
- partition_id: task_def.partition.partition_id as u32,
- launch_time: 0,
- start_exec_time: 0,
- end_exec_time: 0,
- metrics: vec![],
- status: Some(task_status::Status::Successful(SuccessfulTask {
- executor_id: executor_data.executor_id.clone(),
- partitions,
- })),
- }],
- )
- .await?;
- }
-
- state
- .executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
-
- let reservations = state.executor_manager.reserve_slots(1).await?;
-
- assert_eq!(reservations.len(), 1);
-
- // Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should
- // be reserved for the other 3 tasks, emitting another offer event
- let (reservations, pending) = state.offer_reservation(reservations).await?;
-
- assert_eq!(pending, 3);
- assert_eq!(reservations.len(), 3);
-
- // Remaining 3 task slots should be reserved for pending tasks
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 0);
-
- Ok(())
- }
-
- fn test_executors(
- total_executors: usize,
- slots_per_executor: u32,
- ) -> Vec<(ExecutorMetadata, ExecutorData)> {
- let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
- for i in 0..total_executors {
- result.push((
- ExecutorMetadata {
- id: format!("executor-{i}"),
- host: format!("host-{i}"),
- port: 8080,
- grpc_port: 9090,
- specification: ExecutorSpecification {
- task_slots: slots_per_executor,
- },
- },
- ExecutorData {
- executor_id: format!("executor-{i}"),
- total_task_slots: slots_per_executor,
- available_task_slots: slots_per_executor,
- },
- ));
- }
-
- result
- }
-
- async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Utf8, false),
- Field::new("gmv", DataType::UInt64, false),
- ]);
-
- let plan = scan_empty(None, &schema, Some(vec![0, 1]))
- .unwrap()
- .aggregate(vec![col("id")], vec![sum(col("gmv"))])
- .unwrap()
- .build()
- .unwrap();
-
- ctx.state().create_physical_plan(&plan).await.unwrap()
- }
-}
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index 6bf245ca..67f2857b 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -20,20 +20,20 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::execution_graph::{
ExecutionGraph, ExecutionStage, RunningTaskInfo, TaskDescription,
};
-use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
+use crate::state::executor_manager::ExecutorManager;
use ballista_core::error::BallistaError;
use ballista_core::error::Result;
use crate::cluster::JobState;
use ballista_core::serde::protobuf::{
- self, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
+ job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
use ballista_core::serde::BallistaCodec;
use dashmap::DashMap;
-use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use log::{debug, error, info, warn};
@@ -101,19 +101,9 @@ impl TaskLauncher for DefaultTaskLauncher {
executor.id, tasks_ids
);
}
- let mut client = executor_manager.get_client(&executor.id).await?;
- client
- .launch_multi_task(protobuf::LaunchMultiTaskParams {
- multi_tasks: tasks,
- scheduler_id: self.scheduler_id.clone(),
- })
- .await
- .map_err(|e| {
- BallistaError::Internal(format!(
- "Failed to connect to executor {}: {:?}",
- executor.id, e
- ))
- })?;
+ executor_manager
+ .launch_multi_task(&executor.id, tasks, self.scheduler_id.clone())
+ .await?;
Ok(())
}
}
@@ -129,17 +119,21 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
#[derive(Clone)]
-struct JobInfoCache {
+pub struct JobInfoCache {
// Cache for active execution graphs curated by this scheduler
- execution_graph: Arc<RwLock<ExecutionGraph>>,
+ pub execution_graph: Arc<RwLock<ExecutionGraph>>,
+ // Cache for job status
+ pub status: Option<job_status::Status>,
// Cache for encoded execution stage plan to avoid duplicated encoding for multiple tasks
encoded_stage_plans: HashMap<usize, Vec<u8>>,
}
impl JobInfoCache {
- fn new(graph: ExecutionGraph) -> Self {
+ pub fn new(graph: ExecutionGraph) -> Self {
+ let status = graph.status().status.clone();
Self {
execution_graph: Arc::new(RwLock::new(graph)),
+ status,
encoded_stage_plans: HashMap::new(),
}
}
@@ -186,13 +180,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
}
/// Enqueue a job for scheduling
- pub async fn queue_job(
- &self,
- job_id: &str,
- job_name: &str,
- queued_at: u64,
- ) -> Result<()> {
- self.state.accept_job(job_id, job_name, queued_at).await
+ pub fn queue_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
+ self.state.accept_job(job_id, job_name, queued_at)
+ }
+
+ /// Get the number of queued jobs. If it's big, then it means the scheduler is too busy.
+ /// In normal case, it's better to be 0.
+ pub fn pending_job_number(&self) -> usize {
+ self.state.pending_job_number()
+ }
+
+ /// Get the number of running jobs.
+ pub fn running_job_number(&self) -> usize {
+ self.active_job_cache.len()
}
/// Generate an ExecutionGraph for the job and save it to the persistent state.
@@ -225,6 +225,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
Ok(())
}
+ pub fn get_running_job_cache(&self) -> Arc<HashMap<String, JobInfoCache>> {
+ let ret = self
+ .active_job_cache
+ .iter()
+ .filter_map(|pair| {
+ let (job_id, job_info) = pair.pair();
+ if matches!(job_info.status, Some(job_status::Status::Running(_))) {
+ Some((job_id.clone(), job_info.clone()))
+ } else {
+ None
+ }
+ })
+ .collect::<HashMap<_, _>>();
+ Arc::new(ret)
+ }
+
/// Get a list of active job ids
pub async fn get_jobs(&self) -> Result<Vec<JobOverview>> {
let job_ids = self.state.get_jobs().await?;
@@ -251,7 +267,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
if let Some(graph) = self.get_active_execution_graph(job_id) {
let guard = graph.read().await;
- Ok(Some(guard.status()))
+ Ok(Some(guard.status().clone()))
} else {
self.state.get_job_status(job_id).await
}
@@ -320,61 +336,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
Ok(events)
}
- /// Take a list of executor reservations and fill them with tasks that are ready
- /// to be scheduled.
- ///
- /// Here we use the following algorithm:
- ///
- /// 1. For each free reservation, try to assign a task from one of the active jobs
- /// 2. If we cannot find a task in all active jobs, then add the reservation to the list of unassigned reservations
- ///
- /// Finally, we return:
- /// 1. A list of assignments which is a (Executor ID, Task) tuple
- /// 2. A list of unassigned reservations which we could not find tasks for
- /// 3. The number of pending tasks across active jobs
- pub async fn fill_reservations(
- &self,
- reservations: &[ExecutorReservation],
- ) -> Result<(
- Vec<(String, TaskDescription)>,
- Vec<ExecutorReservation>,
- usize,
- )> {
- // Reinitialize the free reservations.
- let free_reservations: Vec<ExecutorReservation> = reservations
- .iter()
- .map(|reservation| {
- ExecutorReservation::new_free(reservation.executor_id.clone())
- })
- .collect();
-
- let mut assignments: Vec<(String, TaskDescription)> = vec![];
- let mut pending_tasks = 0usize;
- let mut assign_tasks = 0usize;
- for pairs in self.active_job_cache.iter() {
- let (_job_id, job_info) = pairs.pair();
- let mut graph = job_info.execution_graph.write().await;
- for reservation in free_reservations.iter().skip(assign_tasks) {
- if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
- assignments.push((reservation.executor_id.clone(), task));
- assign_tasks += 1;
- } else {
- break;
- }
- }
- if assign_tasks >= free_reservations.len() {
- pending_tasks += graph.available_tasks();
- break;
- }
- }
-
- let mut unassigned = vec![];
- for reservation in free_reservations.iter().skip(assign_tasks) {
- unassigned.push(reservation.clone());
- }
- Ok((assignments, unassigned, pending_tasks))
- }
-
/// Mark a job to success. This will create a key under the CompletedJobs keyspace
/// and remove the job from ActiveJobs
pub(crate) async fn succeed_job(&self, job_id: &str) -> Result<()> {
@@ -410,7 +371,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
failure_reason: String,
) -> Result<(Vec<RunningTaskInfo>, usize)> {
let (tasks_to_cancel, pending_tasks) = if let Some(graph) =
- self.get_active_execution_graph(job_id)
+ self.remove_active_execution_graph(job_id)
{
let mut guard = graph.write().await;
@@ -717,7 +678,7 @@ impl From<&ExecutionGraph> for JobOverview {
Self {
job_id: value.job_id().to_string(),
job_name: value.job_name().to_string(),
- status: value.status(),
+ status: value.status().clone(),
start_time: value.start_time(),
end_time: value.end_time(),
num_stages: value.stage_count(),
diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs
index 1821c729..fe274688 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -55,7 +55,6 @@ use datafusion::test_util::scan_empty;
use crate::cluster::BallistaCluster;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::execution_graph::{ExecutionGraph, TaskDescription};
use ballista_core::utils::default_session_builder;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
@@ -451,7 +450,7 @@ impl SchedulerTest {
scheduler
.state
.executor_manager
- .register_executor(metadata, executor_data, false)
+ .register_executor(metadata, executor_data)
.await?;
}
@@ -462,8 +461,12 @@ impl SchedulerTest {
})
}
- pub fn pending_tasks(&self) -> usize {
- self.scheduler.pending_tasks()
+ pub fn pending_job_number(&self) -> usize {
+ self.scheduler.pending_job_number()
+ }
+
+ pub fn running_job_number(&self) -> usize {
+ self.scheduler.running_job_number()
}
pub async fn ctx(&self) -> Result<Arc<SessionContext>> {
@@ -655,12 +658,6 @@ impl SchedulerTest {
final_status
}
-
- pub(crate) fn query_stage_scheduler(
- &self,
- ) -> Arc<QueryStageScheduler<LogicalPlanNode, PhysicalPlanNode>> {
- self.scheduler.query_stage_scheduler()
- }
}
#[derive(Clone)]
@@ -786,6 +783,13 @@ pub fn assert_failed_event(job_id: &str, collector: &TestMetricsCollector) {
}
pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
+ test_aggregation_plan_with_job_id(partition, "job").await
+}
+
+pub async fn test_aggregation_plan_with_job_id(
+ partition: usize,
+ job_id: &str,
+) -> ExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::with_config(config));
let session_state = ctx.state();
@@ -811,7 +815,7 @@ pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 0).unwrap()
+ ExecutionGraph::new("localhost:50050", job_id, "", "session", plan, 0).unwrap()
}
pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {