You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/10/21 01:39:14 UTC
[arrow-ballista] branch master updated: Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 0050ecef Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)
0050ecef is described below
commit 0050eceff46f572a2367c1b93331a94664d9951e
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Oct 21 09:39:09 2022 +0800
Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/scheduler/scheduler_config_spec.toml | 6 +
ballista/scheduler/src/{lib.rs => config.rs} | 37 +++--
ballista/scheduler/src/lib.rs | 1 +
ballista/scheduler/src/main.rs | 23 ++-
ballista/scheduler/src/scheduler_server/mod.rs | 15 +-
ballista/scheduler/src/state/executor_manager.rs | 186 +++++++++++++++++++----
ballista/scheduler/src/state/mod.rs | 5 +-
7 files changed, 223 insertions(+), 50 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index 6ec78103..625e23b9 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
+[[param]]
+name = "executor_slots_policy"
+type = "ballista_scheduler::config::SlotsPolicy"
+doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias"
+default = "ballista_scheduler::config::SlotsPolicy::Bias"
+
[[param]]
name = "plugin_dir"
type = "String"
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/config.rs
similarity index 54%
copy from ballista/scheduler/src/lib.rs
copy to ballista/scheduler/src/config.rs
index d755bc68..4f1f5c24 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/config.rs
@@ -14,18 +14,31 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+//
+
+//! Ballista scheduler specific configuration
+
+use clap::ArgEnum;
+use std::fmt;
+
+// an enum used to configure the executor slots policy
+// needs to be visible to code generated by configure_me
+#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
+pub enum SlotsPolicy {
+ Bias,
+ RoundRobin,
+}
-#![doc = include_str ! ("../README.md")]
+impl std::str::FromStr for SlotsPolicy {
+ type Err = String;
-pub mod api;
-pub mod display;
-pub mod planner;
-pub mod scheduler_server;
-#[cfg(feature = "sled")]
-pub mod standalone;
-pub mod state;
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ ArgEnum::from_str(s, true)
+ }
+}
-#[cfg(feature = "flight-sql")]
-pub mod flight_sql;
-#[cfg(test)]
-pub mod test_utils;
+impl parse_arg::ParseArgFromStr for SlotsPolicy {
+ fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+ write!(writer, "The executor slots policy for the scheduler")
+ }
+}
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index d755bc68..98fac309 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -18,6 +18,7 @@
#![doc = include_str ! ("../README.md")]
pub mod api;
+pub mod config;
pub mod display;
pub mod planner;
pub mod scheduler_server;
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 9b09db6a..fafdfa7a 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -62,6 +62,7 @@ mod config {
}
use ballista_core::utils::create_grpc_server;
+use ballista_scheduler::config::SlotsPolicy;
#[cfg(feature = "flight-sql")]
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
@@ -71,7 +72,8 @@ async fn start_server(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
addr: SocketAddr,
- policy: TaskSchedulingPolicy,
+ scheduling_policy: TaskSchedulingPolicy,
+ slots_policy: SlotsPolicy,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
@@ -80,14 +82,15 @@ async fn start_server(
// Should only call SchedulerServer::new() once in the process
info!(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
- policy
+ scheduling_policy
);
let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
- match policy {
+ match scheduling_policy {
TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
scheduler_name,
config_backend.clone(),
- policy,
+ scheduling_policy,
+ slots_policy,
BallistaCodec::default(),
default_session_builder,
),
@@ -239,7 +242,15 @@ async fn main() -> Result<()> {
}
};
- let policy: TaskSchedulingPolicy = opt.scheduler_policy;
- start_server(scheduler_name, client, addr, policy).await?;
+ let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
+ let slots_policy: SlotsPolicy = opt.executor_slots_policy;
+ start_server(
+ scheduler_name,
+ client,
+ addr,
+ scheduling_policy,
+ slots_policy,
+ )
+ .await?;
Ok(())
}
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 176b85c8..883c6d06 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -30,6 +30,7 @@ use datafusion::logical_plan::LogicalPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
+use crate::config::SlotsPolicy;
use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
@@ -72,6 +73,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduler_name,
config,
TaskSchedulingPolicy::PullStaged,
+ SlotsPolicy::Bias,
codec,
default_session_builder,
)
@@ -87,6 +89,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduler_name,
config,
TaskSchedulingPolicy::PullStaged,
+ SlotsPolicy::Bias,
codec,
session_builder,
)
@@ -95,7 +98,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
pub fn new_with_policy(
scheduler_name: String,
config: Arc<dyn StateBackendClient>,
- policy: TaskSchedulingPolicy,
+ scheduling_policy: TaskSchedulingPolicy,
+ slots_policy: SlotsPolicy,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
) -> Self {
@@ -104,9 +108,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
session_builder,
codec,
scheduler_name.clone(),
+ slots_policy,
));
- SchedulerServer::new_with_state(scheduler_name, policy, state)
+ SchedulerServer::new_with_state(scheduler_name, scheduling_policy, state)
}
pub(crate) fn new_with_state(
@@ -294,6 +299,7 @@ mod test {
};
use ballista_core::error::Result;
+ use crate::config::SlotsPolicy;
use ballista_core::serde::protobuf::{
failed_task, job_status, task_status, ExecutionError, FailedTask, JobStatus,
PhysicalPlanNode, ShuffleWritePartition, SuccessfulTask, TaskStatus,
@@ -753,14 +759,15 @@ mod test {
}
async fn test_scheduler(
- policy: TaskSchedulingPolicy,
+ scheduling_policy: TaskSchedulingPolicy,
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new_with_policy(
"localhost:50050".to_owned(),
state_storage.clone(),
- policy,
+ scheduling_policy,
+ SlotsPolicy::Bias,
BallistaCodec::default(),
default_session_builder,
);
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index 70fc7a0a..2da288fe 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -23,6 +23,7 @@ use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf;
+use crate::config::SlotsPolicy;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
@@ -84,6 +85,8 @@ pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
#[derive(Clone)]
pub(crate) struct ExecutorManager {
+ // executor slot policy
+ slots_policy: SlotsPolicy,
state: Arc<dyn StateBackendClient>,
// executor_id -> ExecutorMetadata map
executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
@@ -95,8 +98,12 @@ pub(crate) struct ExecutorManager {
}
impl ExecutorManager {
- pub(crate) fn new(state: Arc<dyn StateBackendClient>) -> Self {
+ pub(crate) fn new(
+ state: Arc<dyn StateBackendClient>,
+ slots_policy: SlotsPolicy,
+ ) -> Self {
Self {
+ slots_policy,
state,
executor_metadata: Arc::new(DashMap::new()),
executors_heartbeat: Arc::new(DashMap::new()),
@@ -121,38 +128,28 @@ impl ExecutorManager {
/// for scheduling.
/// This operation is atomic, so if this method return an Err, no slots have been reserved.
pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
+ self.reserve_slots_global(n).await
+ }
+
+ /// Reserve up to n executor task slots with considering the global resource snapshot
+ async fn reserve_slots_global(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
let lock = self.state.lock(Keyspace::Slots, "global").await?;
with_lock(lock, async {
debug!("Attempting to reserve {} executor slots", n);
let start = Instant::now();
- let mut reservations: Vec<ExecutorReservation> = vec![];
- let mut desired: u32 = n;
let alive_executors = self.get_alive_executors_within_one_minute();
- let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
- for executor_id in alive_executors {
- let value = self.state.get(Keyspace::Slots, &executor_id).await?;
- let mut data =
- decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
- let take = std::cmp::min(data.available_task_slots, desired);
-
- for _ in 0..take {
- reservations.push(ExecutorReservation::new_free(executor_id.clone()));
- data.available_task_slots -= 1;
- desired -= 1;
+ let (reservations, txn_ops) = match self.slots_policy {
+ SlotsPolicy::Bias => {
+ self.reserve_slots_global_bias(n, alive_executors).await?
}
-
- let proto: protobuf::ExecutorData = data.into();
- let new_data = encode_protobuf(&proto)?;
- txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
-
- if desired == 0 {
- break;
+ SlotsPolicy::RoundRobin => {
+ self.reserve_slots_global_round_robin(n, alive_executors)
+ .await?
}
- }
+ };
self.state.apply_txn(txn_ops).await?;
@@ -168,6 +165,112 @@ impl ExecutorManager {
.await
}
+ /// It will get ExecutorReservation from one executor as many as possible.
+ /// By this way, it can reduce the chance of decoding and encoding ExecutorData.
+ /// However, it may make the whole cluster unbalanced,
+ /// which means some executors may be very busy while other executors may be idle.
+ async fn reserve_slots_global_bias(
+ &self,
+ mut n: u32,
+ alive_executors: HashSet<String>,
+ ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+ let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+ for executor_id in alive_executors {
+ if n == 0 {
+ break;
+ }
+
+ let value = self.state.get(Keyspace::Slots, &executor_id).await?;
+ let mut data = decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
+ let take = std::cmp::min(data.available_task_slots, n);
+
+ for _ in 0..take {
+ reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+ data.available_task_slots -= 1;
+ n -= 1;
+ }
+
+ let proto: protobuf::ExecutorData = data.into();
+ let new_data = encode_protobuf(&proto)?;
+ txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+ }
+
+ Ok((reservations, txn_ops))
+ }
+
+ /// Create ExecutorReservation in a round robin way to evenly assign tasks to executors
+ async fn reserve_slots_global_round_robin(
+ &self,
+ mut n: u32,
+ alive_executors: HashSet<String>,
+ ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+ let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+ let all_executor_data = self
+ .state
+ .scan(Keyspace::Slots, None)
+ .await?
+ .into_iter()
+ .map(|(_, data)| decode_into::<protobuf::ExecutorData, ExecutorData>(&data))
+ .collect::<Result<Vec<ExecutorData>>>()?;
+
+ let mut available_executor_data: Vec<ExecutorData> = all_executor_data
+ .into_iter()
+ .filter_map(|data| {
+ (data.available_task_slots > 0
+ && alive_executors.contains(&data.executor_id))
+ .then_some(data)
+ })
+ .collect();
+ available_executor_data
+ .sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
+
+ // Exclusive
+ let mut last_updated_idx = 0usize;
+ loop {
+ let n_before = n;
+ for (idx, data) in available_executor_data.iter_mut().enumerate() {
+ if n == 0 {
+ break;
+ }
+
+ // Since the vector is sorted in descending order,
+ // if finding one executor has not enough slots, the following will have not enough, either
+ if data.available_task_slots == 0 {
+ break;
+ }
+
+ reservations
+ .push(ExecutorReservation::new_free(data.executor_id.clone()));
+ data.available_task_slots -= 1;
+ n -= 1;
+
+ if idx >= last_updated_idx {
+ last_updated_idx = idx + 1;
+ }
+ }
+
+ if n_before == n {
+ break;
+ }
+ }
+
+ for (idx, data) in available_executor_data.into_iter().enumerate() {
+ if idx >= last_updated_idx {
+ break;
+ }
+ let executor_id = data.executor_id.clone();
+ let proto: protobuf::ExecutorData = data.into();
+ let new_data = encode_protobuf(&proto)?;
+ txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+ }
+
+ Ok((reservations, txn_ops))
+ }
+
/// Returned reserved task slots to the pool of available slots. This operation is atomic
/// so either the entire pool of reserved task slots it returned or none are.
pub async fn cancel_reservations(
@@ -638,6 +741,7 @@ impl ExecutorHeartbeatListener {
#[cfg(test)]
mod test {
+ use crate::config::SlotsPolicy;
use crate::state::backend::standalone::StandaloneClient;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use ballista_core::error::Result;
@@ -648,9 +752,16 @@ mod test {
#[tokio::test]
async fn test_reserve_and_cancel() -> Result<()> {
+ test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
+ test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
+
+ Ok(())
+ }
+
+ async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let executor_manager = ExecutorManager::new(state_storage);
+ let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
@@ -678,9 +789,16 @@ mod test {
#[tokio::test]
async fn test_reserve_partial() -> Result<()> {
+ test_reserve_partial_inner(SlotsPolicy::Bias).await?;
+ test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
+
+ Ok(())
+ }
+
+ async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let executor_manager = ExecutorManager::new(state_storage);
+ let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
@@ -720,6 +838,13 @@ mod test {
#[tokio::test]
async fn test_reserve_concurrent() -> Result<()> {
+ test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
+ test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
+
+ Ok(())
+ }
+
+ async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> {
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
@@ -727,7 +852,7 @@ mod test {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let executor_manager = ExecutorManager::new(state_storage);
+ let executor_manager = ExecutorManager::new(state_storage, slots_policy);
for (executor_metadata, executor_data) in executors {
executor_manager
@@ -762,9 +887,16 @@ mod test {
#[tokio::test]
async fn test_register_reserve() -> Result<()> {
+ test_register_reserve_inner(SlotsPolicy::Bias).await?;
+ test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
+
+ Ok(())
+ }
+
+ async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let executor_manager = ExecutorManager::new(state_storage);
+ let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3de58a0b..6943cbd5 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -28,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::TaskManager;
+use crate::config::SlotsPolicy;
use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
@@ -101,6 +102,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
session_builder,
codec,
"localhost:50050".to_owned(),
+ SlotsPolicy::Bias,
)
}
@@ -109,9 +111,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_name: String,
+ slots_policy: SlotsPolicy,
) -> Self {
Self {
- executor_manager: ExecutorManager::new(config_client.clone()),
+ executor_manager: ExecutorManager::new(config_client.clone(), slots_policy),
task_manager: TaskManager::new(
config_client.clone(),
session_builder,