You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ya...@apache.org on 2023/05/06 07:13:27 UTC
[arrow-ballista] branch main updated: [Improve] refactor the offer_reservation avoid wait result (#760)
This is an automated email from the ASF dual-hosted git repository.
yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 30232e00 [Improve] refactor the offer_reservation avoid wait result (#760)
30232e00 is described below
commit 30232e00d442b62c830bd5b9571bf69aa34b38b3
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat May 6 15:13:23 2023 +0800
[Improve] refactor the offer_reservation avoid wait result (#760)
---
ballista/scheduler/src/state/mod.rs | 193 ++++++++++++++++++------------------
1 file changed, 97 insertions(+), 96 deletions(-)
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 31675cbe..483828cc 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -195,112 +195,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<(Vec<ExecutorReservation>, usize)> {
- let (free_list, pending_tasks) = match self
- .task_manager
- .fill_reservations(&reservations)
- .await
+ let pending_tasks = match self.task_manager.fill_reservations(&reservations).await
{
- Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
- // Put tasks to the same executor together
- // And put tasks belonging to the same stage together for creating MultiTaskDefinition
- let mut executor_stage_assignments: HashMap<
- String,
- HashMap<(String, usize), Vec<TaskDescription>>,
- > = HashMap::new();
- for (executor_id, task) in assignments.into_iter() {
- let stage_key =
- (task.partition.job_id.clone(), task.partition.stage_id);
- if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
- {
- if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
- executor_stage_tasks.push(task);
- } else {
- tasks.insert(stage_key, vec![task]);
- }
- } else {
- let mut executor_stage_tasks: HashMap<
- (String, usize),
- Vec<TaskDescription>,
- > = HashMap::new();
- executor_stage_tasks.insert(stage_key, vec![task]);
- executor_stage_assignments
- .insert(executor_id, executor_stage_tasks);
- }
- }
-
- let mut join_handles = vec![];
- for (executor_id, tasks) in executor_stage_assignments.into_iter() {
- let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
- // Total number of tasks to be launched for one executor
- let n_tasks: usize =
- tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
-
- let task_manager = self.task_manager.clone();
- let executor_manager = self.executor_manager.clone();
- let join_handle = tokio::spawn(async move {
- let success = match executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- if let Err(e) = task_manager
- .launch_multi_task(
- &executor,
- tasks,
- &executor_manager,
- )
- .await
- {
- error!("Failed to launch new task: {:?}", e);
- false
- } else {
- true
- }
- }
- Err(e) => {
- error!("Failed to launch new task, could not get executor metadata: {:?}", e);
- false
- }
- };
- if success {
- vec![]
- } else {
- vec![
- ExecutorReservation::new_free(executor_id.clone(),);
- n_tasks
- ]
- }
- });
- join_handles.push(join_handle);
- }
+ Ok((assignments, unassigned_reservations, pending_tasks)) => {
+ let executor_stage_assignments = Self::combine_task(assignments);
- let unassigned_executor_reservations =
- futures::future::join_all(join_handles)
- .await
- .into_iter()
- .collect::<std::result::Result<
- Vec<Vec<ExecutorReservation>>,
- tokio::task::JoinError,
- >>()?;
- unassigned_reservations.append(
- &mut unassigned_executor_reservations
- .into_iter()
- .flatten()
- .collect::<Vec<ExecutorReservation>>(),
+ self.spawn_tasks_and_persist_reservations_back(
+ executor_stage_assignments,
+ unassigned_reservations,
);
- (unassigned_reservations, pending_tasks)
+
+ pending_tasks
}
+ // If error set all reservations back
Err(e) => {
error!("Error filling reservations: {:?}", e);
- (reservations, 0)
+ self.executor_manager
+ .cancel_reservations(reservations)
+ .await?;
+ 0
}
};
let mut new_reservations = vec![];
- if !free_list.is_empty() {
- // If any reserved slots remain, return them to the pool
- self.executor_manager.cancel_reservations(free_list).await?;
- } else if pending_tasks > 0 {
+
+ if pending_tasks > 0 {
// If there are pending tasks available, try and schedule them
let pending_reservations = self
.executor_manager
@@ -312,6 +231,86 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
Ok((new_reservations, pending_tasks))
}
+ fn spawn_tasks_and_persist_reservations_back(
+ &self,
+ executor_stage_assignments: HashMap<
+ String,
+ HashMap<(String, usize), Vec<TaskDescription>>,
+ >,
+ mut unassigned_reservations: Vec<ExecutorReservation>,
+ ) {
+ let task_manager = self.task_manager.clone();
+ let executor_manager = self.executor_manager.clone();
+
+ tokio::spawn(async move {
+ for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+ let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+ // Total number of tasks to be launched for one executor
+ let n_tasks: usize =
+ tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
+ match executor_manager.get_executor_metadata(&executor_id).await {
+ Ok(executor) => {
+ if let Err(e) = task_manager
+ .launch_multi_task(&executor, tasks, &executor_manager)
+ .await
+ {
+ error!("Failed to launch new task: {:?}", e);
+ // set resource back.
+ unassigned_reservations.append(&mut vec![
+ ExecutorReservation::new_free(
+ executor_id.clone(),
+ );
+ n_tasks
+ ]);
+ }
+ }
+ Err(e) => {
+ error!("Failed to launch new task, could not get executor metadata: {:?}", e);
+ // here no need set resource back.
+ }
+ };
+ }
+ if !unassigned_reservations.is_empty() {
+ // If any reserved slots remain, return them to the pool
+ executor_manager
+ .cancel_reservations(unassigned_reservations)
+ .await
+ .expect("cancel_reservations fail!");
+ }
+ });
+ }
+
+ // Put tasks to the same executor together
+ // And put tasks belonging to the same stage together for creating MultiTaskDefinition
+ // return a map of <executor_id, <stage_key, TaskDesc>>.
+ fn combine_task(
+ assignments: Vec<(String, TaskDescription)>,
+ ) -> HashMap<String, HashMap<(String, usize), Vec<TaskDescription>>> {
+ let mut executor_stage_assignments: HashMap<
+ String,
+ HashMap<(String, usize), Vec<TaskDescription>>,
+ > = HashMap::new();
+ for (executor_id, task) in assignments.into_iter() {
+ let stage_key = (task.partition.job_id.clone(), task.partition.stage_id);
+ if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) {
+ if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
+ executor_stage_tasks.push(task);
+ } else {
+ tasks.insert(stage_key, vec![task]);
+ }
+ } else {
+ let mut executor_stage_tasks: HashMap<
+ (String, usize),
+ Vec<TaskDescription>,
+ > = HashMap::new();
+ executor_stage_tasks.insert(stage_key, vec![task]);
+ executor_stage_assignments.insert(executor_id, executor_stage_tasks);
+ }
+ }
+ executor_stage_assignments
+ }
+
pub(crate) async fn submit_job(
&self,
job_id: &str,
@@ -459,6 +458,8 @@ mod test {
assert_eq!(assigned, 0);
assert!(result.is_empty());
+ // Need sleep wait for the spawn task work done.
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// All reservations should have been cancelled so we should be able to reserve them now
let reservations = state.executor_manager.reserve_slots(4).await?;