You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ya...@apache.org on 2023/05/06 07:13:27 UTC

[arrow-ballista] branch main updated: [Improve] refactor the offer_reservation avoid wait result (#760)

This is an automated email from the ASF dual-hosted git repository.

yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 30232e00 [Improve] refactor the offer_reservation avoid wait result (#760)
30232e00 is described below

commit 30232e00d442b62c830bd5b9571bf69aa34b38b3
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat May 6 15:13:23 2023 +0800

    [Improve] refactor the offer_reservation avoid wait result (#760)
---
 ballista/scheduler/src/state/mod.rs | 193 ++++++++++++++++++------------------
 1 file changed, 97 insertions(+), 96 deletions(-)

diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 31675cbe..483828cc 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -195,112 +195,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         &self,
         reservations: Vec<ExecutorReservation>,
     ) -> Result<(Vec<ExecutorReservation>, usize)> {
-        let (free_list, pending_tasks) = match self
-            .task_manager
-            .fill_reservations(&reservations)
-            .await
+        let pending_tasks = match self.task_manager.fill_reservations(&reservations).await
         {
-            Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
-                // Put tasks to the same executor together
-                // And put tasks belonging to the same stage together for creating MultiTaskDefinition
-                let mut executor_stage_assignments: HashMap<
-                    String,
-                    HashMap<(String, usize), Vec<TaskDescription>>,
-                > = HashMap::new();
-                for (executor_id, task) in assignments.into_iter() {
-                    let stage_key =
-                        (task.partition.job_id.clone(), task.partition.stage_id);
-                    if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
-                    {
-                        if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
-                            executor_stage_tasks.push(task);
-                        } else {
-                            tasks.insert(stage_key, vec![task]);
-                        }
-                    } else {
-                        let mut executor_stage_tasks: HashMap<
-                            (String, usize),
-                            Vec<TaskDescription>,
-                        > = HashMap::new();
-                        executor_stage_tasks.insert(stage_key, vec![task]);
-                        executor_stage_assignments
-                            .insert(executor_id, executor_stage_tasks);
-                    }
-                }
-
-                let mut join_handles = vec![];
-                for (executor_id, tasks) in executor_stage_assignments.into_iter() {
-                    let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
-                    // Total number of tasks to be launched for one executor
-                    let n_tasks: usize =
-                        tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
-
-                    let task_manager = self.task_manager.clone();
-                    let executor_manager = self.executor_manager.clone();
-                    let join_handle = tokio::spawn(async move {
-                        let success = match executor_manager
-                            .get_executor_metadata(&executor_id)
-                            .await
-                        {
-                            Ok(executor) => {
-                                if let Err(e) = task_manager
-                                    .launch_multi_task(
-                                        &executor,
-                                        tasks,
-                                        &executor_manager,
-                                    )
-                                    .await
-                                {
-                                    error!("Failed to launch new task: {:?}", e);
-                                    false
-                                } else {
-                                    true
-                                }
-                            }
-                            Err(e) => {
-                                error!("Failed to launch new task, could not get executor metadata: {:?}", e);
-                                false
-                            }
-                        };
-                        if success {
-                            vec![]
-                        } else {
-                            vec![
-                                ExecutorReservation::new_free(executor_id.clone(),);
-                                n_tasks
-                            ]
-                        }
-                    });
-                    join_handles.push(join_handle);
-                }
+            Ok((assignments, unassigned_reservations, pending_tasks)) => {
+                let executor_stage_assignments = Self::combine_task(assignments);
 
-                let unassigned_executor_reservations =
-                    futures::future::join_all(join_handles)
-                        .await
-                        .into_iter()
-                        .collect::<std::result::Result<
-                        Vec<Vec<ExecutorReservation>>,
-                        tokio::task::JoinError,
-                    >>()?;
-                unassigned_reservations.append(
-                    &mut unassigned_executor_reservations
-                        .into_iter()
-                        .flatten()
-                        .collect::<Vec<ExecutorReservation>>(),
+                self.spawn_tasks_and_persist_reservations_back(
+                    executor_stage_assignments,
+                    unassigned_reservations,
                 );
-                (unassigned_reservations, pending_tasks)
+
+                pending_tasks
             }
+            // If error set all reservations back
             Err(e) => {
                 error!("Error filling reservations: {:?}", e);
-                (reservations, 0)
+                self.executor_manager
+                    .cancel_reservations(reservations)
+                    .await?;
+                0
             }
         };
 
         let mut new_reservations = vec![];
-        if !free_list.is_empty() {
-            // If any reserved slots remain, return them to the pool
-            self.executor_manager.cancel_reservations(free_list).await?;
-        } else if pending_tasks > 0 {
+
+        if pending_tasks > 0 {
             // If there are pending tasks available, try and schedule them
             let pending_reservations = self
                 .executor_manager
@@ -312,6 +231,86 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
         Ok((new_reservations, pending_tasks))
     }
 
+    fn spawn_tasks_and_persist_reservations_back(
+        &self,
+        executor_stage_assignments: HashMap<
+            String,
+            HashMap<(String, usize), Vec<TaskDescription>>,
+        >,
+        mut unassigned_reservations: Vec<ExecutorReservation>,
+    ) {
+        let task_manager = self.task_manager.clone();
+        let executor_manager = self.executor_manager.clone();
+
+        tokio::spawn(async move {
+            for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+                let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+                // Total number of tasks to be launched for one executor
+                let n_tasks: usize =
+                    tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
+                match executor_manager.get_executor_metadata(&executor_id).await {
+                    Ok(executor) => {
+                        if let Err(e) = task_manager
+                            .launch_multi_task(&executor, tasks, &executor_manager)
+                            .await
+                        {
+                            error!("Failed to launch new task: {:?}", e);
+                            // set resource back.
+                            unassigned_reservations.append(&mut vec![
+                                    ExecutorReservation::new_free(
+                                        executor_id.clone(),
+                                    );
+                                    n_tasks
+                                ]);
+                        }
+                    }
+                    Err(e) => {
+                        error!("Failed to launch new task, could not get executor metadata: {:?}", e);
+                        // here no need set resource back.
+                    }
+                };
+            }
+            if !unassigned_reservations.is_empty() {
+                // If any reserved slots remain, return them to the pool
+                executor_manager
+                    .cancel_reservations(unassigned_reservations)
+                    .await
+                    .expect("cancel_reservations fail!");
+            }
+        });
+    }
+
+    // Put tasks to the same executor together
+    // And put tasks belonging to the same stage together for creating MultiTaskDefinition
+    // return a map of <executor_id, <stage_key, TaskDesc>>.
+    fn combine_task(
+        assignments: Vec<(String, TaskDescription)>,
+    ) -> HashMap<String, HashMap<(String, usize), Vec<TaskDescription>>> {
+        let mut executor_stage_assignments: HashMap<
+            String,
+            HashMap<(String, usize), Vec<TaskDescription>>,
+        > = HashMap::new();
+        for (executor_id, task) in assignments.into_iter() {
+            let stage_key = (task.partition.job_id.clone(), task.partition.stage_id);
+            if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) {
+                if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
+                    executor_stage_tasks.push(task);
+                } else {
+                    tasks.insert(stage_key, vec![task]);
+                }
+            } else {
+                let mut executor_stage_tasks: HashMap<
+                    (String, usize),
+                    Vec<TaskDescription>,
+                > = HashMap::new();
+                executor_stage_tasks.insert(stage_key, vec![task]);
+                executor_stage_assignments.insert(executor_id, executor_stage_tasks);
+            }
+        }
+        executor_stage_assignments
+    }
+
     pub(crate) async fn submit_job(
         &self,
         job_id: &str,
@@ -459,6 +458,8 @@ mod test {
         assert_eq!(assigned, 0);
         assert!(result.is_empty());
 
+        // Need sleep wait for the spawn task work done.
+        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
         // All reservations should have been cancelled so we should be able to reserve them now
         let reservations = state.executor_manager.reserve_slots(4).await?;