You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2022/12/13 20:35:26 UTC

[arrow-ballista] branch master updated: Make it concurrently to launch tasks to executors (#557)

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c7a9979 Make it concurrently to launch tasks to executors (#557)
7c7a9979 is described below

commit 7c7a997912fb38702c50974a799d779636580f94
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Dec 14 04:35:21 2022 +0800

    Make it concurrently to launch tasks to executors (#557)
    
    * Make it concurrently to launch tasks to executors
    
    * Refine for comments
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/scheduler/src/state/mod.rs | 79 +++++++++++++++++++++++--------------
 1 file changed, 49 insertions(+), 30 deletions(-)

diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 3c580280..0671fdbe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -236,47 +236,66 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                     }
                 }
 
+                let mut join_handles = vec![];
                 for (executor_id, tasks) in executor_stage_assignments.into_iter() {
                     let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
                     // Total number of tasks to be launched for one executor
                     let n_tasks: usize =
                         tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
 
-                    match self
-                        .executor_manager
-                        .get_executor_metadata(&executor_id)
-                        .await
-                    {
-                        Ok(executor) => {
-                            if let Err(e) = self
-                                .task_manager
-                                .launch_multi_task(
-                                    &executor,
-                                    tasks,
-                                    &self.executor_manager,
-                                )
-                                .await
-                            {
-                                error!("Failed to launch new task: {:?}", e);
-                                for _i in 0..n_tasks {
-                                    unassigned_reservations.push(
-                                        ExecutorReservation::new_free(
-                                            executor_id.clone(),
-                                        ),
-                                    );
+                    let task_manager = self.task_manager.clone();
+                    let executor_manager = self.executor_manager.clone();
+                    let join_handle = tokio::spawn(async move {
+                        let success = match executor_manager
+                            .get_executor_metadata(&executor_id)
+                            .await
+                        {
+                            Ok(executor) => {
+                                if let Err(e) = task_manager
+                                    .launch_multi_task(
+                                        &executor,
+                                        tasks,
+                                        &executor_manager,
+                                    )
+                                    .await
+                                {
+                                    error!("Failed to launch new task: {:?}", e);
+                                    false
+                                } else {
+                                    true
                                 }
                             }
-                        }
-                        Err(e) => {
-                            error!("Failed to launch new task, could not get executor metadata: {:?}", e);
-                            for _i in 0..n_tasks {
-                                unassigned_reservations.push(
-                                    ExecutorReservation::new_free(executor_id.clone()),
-                                );
+                            Err(e) => {
+                                error!("Failed to launch new task, could not get executor metadata: {:?}", e);
+                                false
                             }
+                        };
+                        if success {
+                            vec![]
+                        } else {
+                            vec![
+                                ExecutorReservation::new_free(executor_id.clone(),);
+                                n_tasks
+                            ]
                         }
-                    }
+                    });
+                    join_handles.push(join_handle);
                 }
+
+                let unassigned_executor_reservations =
+                    futures::future::join_all(join_handles)
+                        .await
+                        .into_iter()
+                        .collect::<std::result::Result<
+                        Vec<Vec<ExecutorReservation>>,
+                        tokio::task::JoinError,
+                    >>()?;
+                unassigned_reservations.append(
+                    &mut unassigned_executor_reservations
+                        .into_iter()
+                        .flatten()
+                        .collect::<Vec<ExecutorReservation>>(),
+                );
                 (unassigned_reservations, pending_tasks)
             }
             Err(e) => {