You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2022/12/13 20:35:26 UTC
[arrow-ballista] branch master updated: Make it concurrently to launch tasks to executors (#557)
This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 7c7a9979 Make it concurrently to launch tasks to executors (#557)
7c7a9979 is described below
commit 7c7a997912fb38702c50974a799d779636580f94
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Dec 14 04:35:21 2022 +0800
Make it concurrently to launch tasks to executors (#557)
* Make it concurrently to launch tasks to executors
* Refine for comments
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/scheduler/src/state/mod.rs | 79 +++++++++++++++++++++++--------------
1 file changed, 49 insertions(+), 30 deletions(-)
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3c580280..0671fdbe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -236,47 +236,66 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
}
}
+ let mut join_handles = vec![];
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
// Total number of tasks to be launched for one executor
let n_tasks: usize =
tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
- match self
- .executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- if let Err(e) = self
- .task_manager
- .launch_multi_task(
- &executor,
- tasks,
- &self.executor_manager,
- )
- .await
- {
- error!("Failed to launch new task: {:?}", e);
- for _i in 0..n_tasks {
- unassigned_reservations.push(
- ExecutorReservation::new_free(
- executor_id.clone(),
- ),
- );
+ let task_manager = self.task_manager.clone();
+ let executor_manager = self.executor_manager.clone();
+ let join_handle = tokio::spawn(async move {
+ let success = match executor_manager
+ .get_executor_metadata(&executor_id)
+ .await
+ {
+ Ok(executor) => {
+ if let Err(e) = task_manager
+ .launch_multi_task(
+ &executor,
+ tasks,
+ &executor_manager,
+ )
+ .await
+ {
+ error!("Failed to launch new task: {:?}", e);
+ false
+ } else {
+ true
}
}
- }
- Err(e) => {
- error!("Failed to launch new task, could not get executor metadata: {:?}", e);
- for _i in 0..n_tasks {
- unassigned_reservations.push(
- ExecutorReservation::new_free(executor_id.clone()),
- );
+ Err(e) => {
+ error!("Failed to launch new task, could not get executor metadata: {:?}", e);
+ false
}
+ };
+ if success {
+ vec![]
+ } else {
+ vec![
+ ExecutorReservation::new_free(executor_id.clone(),);
+ n_tasks
+ ]
}
- }
+ });
+ join_handles.push(join_handle);
}
+
+ let unassigned_executor_reservations =
+ futures::future::join_all(join_handles)
+ .await
+ .into_iter()
+ .collect::<std::result::Result<
+ Vec<Vec<ExecutorReservation>>,
+ tokio::task::JoinError,
+ >>()?;
+ unassigned_reservations.append(
+ &mut unassigned_executor_reservations
+ .into_iter()
+ .flatten()
+ .collect::<Vec<ExecutorReservation>>(),
+ );
(unassigned_reservations, pending_tasks)
}
Err(e) => {