You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/10/21 01:39:14 UTC

[arrow-ballista] branch master updated: Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 0050ecef Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)
0050ecef is described below

commit 0050eceff46f572a2367c1b93331a94664d9951e
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Oct 21 09:39:09 2022 +0800

    Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors (#395)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/scheduler/scheduler_config_spec.toml    |   6 +
 ballista/scheduler/src/{lib.rs => config.rs}     |  37 +++--
 ballista/scheduler/src/lib.rs                    |   1 +
 ballista/scheduler/src/main.rs                   |  23 ++-
 ballista/scheduler/src/scheduler_server/mod.rs   |  15 +-
 ballista/scheduler/src/state/executor_manager.rs | 186 +++++++++++++++++++----
 ballista/scheduler/src/state/mod.rs              |   5 +-
 7 files changed, 223 insertions(+), 50 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index 6ec78103..625e23b9 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy"
 doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged"
 default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
 
+[[param]]
+name = "executor_slots_policy"
+type = "ballista_scheduler::config::SlotsPolicy"
+doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias"
+default = "ballista_scheduler::config::SlotsPolicy::Bias"
+
 [[param]]
 name = "plugin_dir"
 type = "String"
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/config.rs
similarity index 54%
copy from ballista/scheduler/src/lib.rs
copy to ballista/scheduler/src/config.rs
index d755bc68..4f1f5c24 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/config.rs
@@ -14,18 +14,31 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+//
+
+//! Ballista scheduler specific configuration
+
+use clap::ArgEnum;
+use std::fmt;
+
+// an enum used to configure the executor slots policy
+// needs to be visible to code generated by configure_me
+#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
+pub enum SlotsPolicy {
+    Bias,
+    RoundRobin,
+}
 
-#![doc = include_str ! ("../README.md")]
+impl std::str::FromStr for SlotsPolicy {
+    type Err = String;
 
-pub mod api;
-pub mod display;
-pub mod planner;
-pub mod scheduler_server;
-#[cfg(feature = "sled")]
-pub mod standalone;
-pub mod state;
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        ArgEnum::from_str(s, true)
+    }
+}
 
-#[cfg(feature = "flight-sql")]
-pub mod flight_sql;
-#[cfg(test)]
-pub mod test_utils;
+impl parse_arg::ParseArgFromStr for SlotsPolicy {
+    fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+        write!(writer, "The executor slots policy for the scheduler")
+    }
+}
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index d755bc68..98fac309 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -18,6 +18,7 @@
 #![doc = include_str ! ("../README.md")]
 
 pub mod api;
+pub mod config;
 pub mod display;
 pub mod planner;
 pub mod scheduler_server;
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 9b09db6a..fafdfa7a 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -62,6 +62,7 @@ mod config {
 }
 
 use ballista_core::utils::create_grpc_server;
+use ballista_scheduler::config::SlotsPolicy;
 #[cfg(feature = "flight-sql")]
 use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
 use config::prelude::*;
@@ -71,7 +72,8 @@ async fn start_server(
     scheduler_name: String,
     config_backend: Arc<dyn StateBackendClient>,
     addr: SocketAddr,
-    policy: TaskSchedulingPolicy,
+    scheduling_policy: TaskSchedulingPolicy,
+    slots_policy: SlotsPolicy,
 ) -> Result<()> {
     info!(
         "Ballista v{} Scheduler listening on {:?}",
@@ -80,14 +82,15 @@ async fn start_server(
     // Should only call SchedulerServer::new() once in the process
     info!(
         "Starting Scheduler grpc server with task scheduling policy of {:?}",
-        policy
+        scheduling_policy
     );
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
-        match policy {
+        match scheduling_policy {
             TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
                 scheduler_name,
                 config_backend.clone(),
-                policy,
+                scheduling_policy,
+                slots_policy,
                 BallistaCodec::default(),
                 default_session_builder,
             ),
@@ -239,7 +242,15 @@ async fn main() -> Result<()> {
         }
     };
 
-    let policy: TaskSchedulingPolicy = opt.scheduler_policy;
-    start_server(scheduler_name, client, addr, policy).await?;
+    let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
+    let slots_policy: SlotsPolicy = opt.executor_slots_policy;
+    start_server(
+        scheduler_name,
+        client,
+        addr,
+        scheduling_policy,
+        slots_policy,
+    )
+    .await?;
     Ok(())
 }
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 176b85c8..883c6d06 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -30,6 +30,7 @@ use datafusion::logical_plan::LogicalPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 
+use crate::config::SlotsPolicy;
 use log::{error, warn};
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
@@ -72,6 +73,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             scheduler_name,
             config,
             TaskSchedulingPolicy::PullStaged,
+            SlotsPolicy::Bias,
             codec,
             default_session_builder,
         )
@@ -87,6 +89,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             scheduler_name,
             config,
             TaskSchedulingPolicy::PullStaged,
+            SlotsPolicy::Bias,
             codec,
             session_builder,
         )
@@ -95,7 +98,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
     pub fn new_with_policy(
         scheduler_name: String,
         config: Arc<dyn StateBackendClient>,
-        policy: TaskSchedulingPolicy,
+        scheduling_policy: TaskSchedulingPolicy,
+        slots_policy: SlotsPolicy,
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
     ) -> Self {
@@ -104,9 +108,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             session_builder,
             codec,
             scheduler_name.clone(),
+            slots_policy,
         ));
 
-        SchedulerServer::new_with_state(scheduler_name, policy, state)
+        SchedulerServer::new_with_state(scheduler_name, scheduling_policy, state)
     }
 
     pub(crate) fn new_with_state(
@@ -294,6 +299,7 @@ mod test {
     };
     use ballista_core::error::Result;
 
+    use crate::config::SlotsPolicy;
     use ballista_core::serde::protobuf::{
         failed_task, job_status, task_status, ExecutionError, FailedTask, JobStatus,
         PhysicalPlanNode, ShuffleWritePartition, SuccessfulTask, TaskStatus,
@@ -753,14 +759,15 @@ mod test {
     }
 
     async fn test_scheduler(
-        policy: TaskSchedulingPolicy,
+        scheduling_policy: TaskSchedulingPolicy,
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new_with_policy(
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
-                policy,
+                scheduling_policy,
+                SlotsPolicy::Bias,
                 BallistaCodec::default(),
                 default_session_builder,
             );
diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs
index 70fc7a0a..2da288fe 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -23,6 +23,7 @@ use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf;
 
+use crate::config::SlotsPolicy;
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
@@ -84,6 +85,8 @@ pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
 
 #[derive(Clone)]
 pub(crate) struct ExecutorManager {
+    // executor slot policy
+    slots_policy: SlotsPolicy,
     state: Arc<dyn StateBackendClient>,
     // executor_id -> ExecutorMetadata map
     executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
@@ -95,8 +98,12 @@ pub(crate) struct ExecutorManager {
 }
 
 impl ExecutorManager {
-    pub(crate) fn new(state: Arc<dyn StateBackendClient>) -> Self {
+    pub(crate) fn new(
+        state: Arc<dyn StateBackendClient>,
+        slots_policy: SlotsPolicy,
+    ) -> Self {
         Self {
+            slots_policy,
             state,
             executor_metadata: Arc::new(DashMap::new()),
             executors_heartbeat: Arc::new(DashMap::new()),
@@ -121,38 +128,28 @@ impl ExecutorManager {
     /// for scheduling.
     /// This operation is atomic, so if this method return an Err, no slots have been reserved.
     pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
+        self.reserve_slots_global(n).await
+    }
+
+    /// Reserve up to n executor task slots with considering the global resource snapshot
+    async fn reserve_slots_global(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
         let lock = self.state.lock(Keyspace::Slots, "global").await?;
 
         with_lock(lock, async {
             debug!("Attempting to reserve {} executor slots", n);
             let start = Instant::now();
-            let mut reservations: Vec<ExecutorReservation> = vec![];
-            let mut desired: u32 = n;
 
             let alive_executors = self.get_alive_executors_within_one_minute();
 
-            let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
-            for executor_id in alive_executors {
-                let value = self.state.get(Keyspace::Slots, &executor_id).await?;
-                let mut data =
-                    decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
-                let take = std::cmp::min(data.available_task_slots, desired);
-
-                for _ in 0..take {
-                    reservations.push(ExecutorReservation::new_free(executor_id.clone()));
-                    data.available_task_slots -= 1;
-                    desired -= 1;
+            let (reservations, txn_ops) = match self.slots_policy {
+                SlotsPolicy::Bias => {
+                    self.reserve_slots_global_bias(n, alive_executors).await?
                 }
-
-                let proto: protobuf::ExecutorData = data.into();
-                let new_data = encode_protobuf(&proto)?;
-                txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
-
-                if desired == 0 {
-                    break;
+                SlotsPolicy::RoundRobin => {
+                    self.reserve_slots_global_round_robin(n, alive_executors)
+                        .await?
                 }
-            }
+            };
 
             self.state.apply_txn(txn_ops).await?;
 
@@ -168,6 +165,112 @@ impl ExecutorManager {
         .await
     }
 
+    /// It will get ExecutorReservation from one executor as many as possible.
+    /// By this way, it can reduce the chance of decoding and encoding ExecutorData.
+    /// However, it may make the whole cluster unbalanced,
+    /// which means some executors may be very busy while other executors may be idle.
+    async fn reserve_slots_global_bias(
+        &self,
+        mut n: u32,
+        alive_executors: HashSet<String>,
+    ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
+        let mut reservations: Vec<ExecutorReservation> = vec![];
+        let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+        for executor_id in alive_executors {
+            if n == 0 {
+                break;
+            }
+
+            let value = self.state.get(Keyspace::Slots, &executor_id).await?;
+            let mut data = decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
+            let take = std::cmp::min(data.available_task_slots, n);
+
+            for _ in 0..take {
+                reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+                data.available_task_slots -= 1;
+                n -= 1;
+            }
+
+            let proto: protobuf::ExecutorData = data.into();
+            let new_data = encode_protobuf(&proto)?;
+            txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+        }
+
+        Ok((reservations, txn_ops))
+    }
+
+    /// Create ExecutorReservation in a round robin way to evenly assign tasks to executors
+    async fn reserve_slots_global_round_robin(
+        &self,
+        mut n: u32,
+        alive_executors: HashSet<String>,
+    ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
+        let mut reservations: Vec<ExecutorReservation> = vec![];
+        let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+        let all_executor_data = self
+            .state
+            .scan(Keyspace::Slots, None)
+            .await?
+            .into_iter()
+            .map(|(_, data)| decode_into::<protobuf::ExecutorData, ExecutorData>(&data))
+            .collect::<Result<Vec<ExecutorData>>>()?;
+
+        let mut available_executor_data: Vec<ExecutorData> = all_executor_data
+            .into_iter()
+            .filter_map(|data| {
+                (data.available_task_slots > 0
+                    && alive_executors.contains(&data.executor_id))
+                .then_some(data)
+            })
+            .collect();
+        available_executor_data
+            .sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
+
+        // Exclusive
+        let mut last_updated_idx = 0usize;
+        loop {
+            let n_before = n;
+            for (idx, data) in available_executor_data.iter_mut().enumerate() {
+                if n == 0 {
+                    break;
+                }
+
+                // Since the vector is sorted in descending order,
+                // if finding one executor has not enough slots, the following will have not enough, either
+                if data.available_task_slots == 0 {
+                    break;
+                }
+
+                reservations
+                    .push(ExecutorReservation::new_free(data.executor_id.clone()));
+                data.available_task_slots -= 1;
+                n -= 1;
+
+                if idx >= last_updated_idx {
+                    last_updated_idx = idx + 1;
+                }
+            }
+
+            if n_before == n {
+                break;
+            }
+        }
+
+        for (idx, data) in available_executor_data.into_iter().enumerate() {
+            if idx >= last_updated_idx {
+                break;
+            }
+            let executor_id = data.executor_id.clone();
+            let proto: protobuf::ExecutorData = data.into();
+            let new_data = encode_protobuf(&proto)?;
+            txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+        }
+
+        Ok((reservations, txn_ops))
+    }
+
     /// Returned reserved task slots to the pool of available slots. This operation is atomic
     /// so either the entire pool of reserved task slots it returned or none are.
     pub async fn cancel_reservations(
@@ -638,6 +741,7 @@ impl ExecutorHeartbeatListener {
 
 #[cfg(test)]
 mod test {
+    use crate::config::SlotsPolicy;
     use crate::state::backend::standalone::StandaloneClient;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
     use ballista_core::error::Result;
@@ -648,9 +752,16 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_and_cancel() -> Result<()> {
+        test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
+        test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
+
+        Ok(())
+    }
+
+    async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
 
-        let executor_manager = ExecutorManager::new(state_storage);
+        let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
         let executors = test_executors(10, 4);
 
@@ -678,9 +789,16 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_partial() -> Result<()> {
+        test_reserve_partial_inner(SlotsPolicy::Bias).await?;
+        test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
+
+        Ok(())
+    }
+
+    async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
 
-        let executor_manager = ExecutorManager::new(state_storage);
+        let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
         let executors = test_executors(10, 4);
 
@@ -720,6 +838,13 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_concurrent() -> Result<()> {
+        test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
+        test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
+
+        Ok(())
+    }
+
+    async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> {
         let (sender, mut receiver) =
             tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
 
@@ -727,7 +852,7 @@ mod test {
 
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
 
-        let executor_manager = ExecutorManager::new(state_storage);
+        let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
         for (executor_metadata, executor_data) in executors {
             executor_manager
@@ -762,9 +887,16 @@ mod test {
 
     #[tokio::test]
     async fn test_register_reserve() -> Result<()> {
+        test_register_reserve_inner(SlotsPolicy::Bias).await?;
+        test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
+
+        Ok(())
+    }
+
+    async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
 
-        let executor_manager = ExecutorManager::new(state_storage);
+        let executor_manager = ExecutorManager::new(state_storage, slots_policy);
 
         let executors = test_executors(10, 4);
 
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3de58a0b..6943cbd5 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -28,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
 use crate::state::session_manager::SessionManager;
 use crate::state::task_manager::TaskManager;
 
+use crate::config::SlotsPolicy;
 use crate::state::execution_graph::TaskDescription;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::TaskStatus;
@@ -101,6 +102,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             session_builder,
             codec,
             "localhost:50050".to_owned(),
+            SlotsPolicy::Bias,
         )
     }
 
@@ -109,9 +111,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
         scheduler_name: String,
+        slots_policy: SlotsPolicy,
     ) -> Self {
         Self {
-            executor_manager: ExecutorManager::new(config_client.clone()),
+            executor_manager: ExecutorManager::new(config_client.clone(), slots_policy),
             task_manager: TaskManager::new(
                 config_client.clone(),
                 session_builder,