You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/21 00:59:33 UTC

[arrow-ballista] branch master updated: Improve launch task efficiency by calling LaunchMultiTask (#394)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 93df7446 Improve launch task efficiency by calling LaunchMultiTask (#394)
93df7446 is described below

commit 93df7446aeacd8f47877f0daa6e2f31f2eefa719
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Oct 21 08:59:27 2022 +0800

    Improve launch task efficiency by calling LaunchMultiTask (#394)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/scheduler/src/state/mod.rs          |  57 +++++++++++--
 ballista/scheduler/src/state/task_manager.rs | 115 ++++++++++++++++++++++++++-
 2 files changed, 164 insertions(+), 8 deletions(-)

diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 57fdc3c0..3de58a0b 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::any::type_name;
+use std::collections::HashMap;
 use std::future::Future;
 use std::sync::Arc;
 use std::time::Instant;
@@ -27,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
 use crate::state::session_manager::SessionManager;
 use crate::state::task_manager::TaskManager;
 
+use crate::state::execution_graph::TaskDescription;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
@@ -196,7 +198,39 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
             .await
         {
             Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
+                // Put tasks to the same executor together
+                // And put tasks belonging to the same stage together for creating MultiTaskDefinition
+                let mut executor_stage_assignments: HashMap<
+                    String,
+                    HashMap<(String, usize), Vec<TaskDescription>>,
+                > = HashMap::new();
                 for (executor_id, task) in assignments.into_iter() {
+                    let stage_key =
+                        (task.partition.job_id.clone(), task.partition.stage_id);
+                    if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
+                    {
+                        if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
+                            executor_stage_tasks.push(task);
+                        } else {
+                            tasks.insert(stage_key, vec![task]);
+                        }
+                    } else {
+                        let mut executor_stage_tasks: HashMap<
+                            (String, usize),
+                            Vec<TaskDescription>,
+                        > = HashMap::new();
+                        executor_stage_tasks.insert(stage_key, vec![task]);
+                        executor_stage_assignments
+                            .insert(executor_id, executor_stage_tasks);
+                    }
+                }
+
+                for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+                    let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+                    // Total number of tasks to be launched for one executor
+                    let n_tasks: usize =
+                        tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
                     match self
                         .executor_manager
                         .get_executor_metadata(&executor_id)
@@ -205,19 +239,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
                         Ok(executor) => {
                             if let Err(e) = self
                                 .task_manager
-                                .launch_task(&executor, task, &self.executor_manager)
+                                .launch_multi_task(
+                                    &executor,
+                                    tasks,
+                                    &self.executor_manager,
+                                )
                                 .await
                             {
                                 error!("Failed to launch new task: {:?}", e);
-                                unassigned_reservations.push(
-                                    ExecutorReservation::new_free(executor_id.clone()),
-                                );
+                                for _i in 0..n_tasks {
+                                    unassigned_reservations.push(
+                                        ExecutorReservation::new_free(
+                                            executor_id.clone(),
+                                        ),
+                                    );
+                                }
                             }
                         }
                         Err(e) => {
                             error!("Failed to launch new task, could not get executor metadata: {:?}", e);
-                            unassigned_reservations
-                                .push(ExecutorReservation::new_free(executor_id.clone()));
+                            for _i in 0..n_tasks {
+                                unassigned_reservations.push(
+                                    ExecutorReservation::new_free(executor_id.clone()),
+                                );
+                            }
                         }
                     }
                 }
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index 841c5421..fed7cce3 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,7 +29,8 @@ use ballista_core::error::Result;
 
 use crate::state::session_manager::create_datafusion_context;
 use ballista_core::serde::protobuf::{
-    self, job_status, FailedJob, JobStatus, TaskDefinition, TaskStatus,
+    self, job_status, FailedJob, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
+    TaskStatus,
 };
 use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
 use ballista_core::serde::scheduler::ExecutorMetadata;
@@ -518,6 +519,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         .await
     }
 
+    #[allow(dead_code)]
     #[cfg(not(test))]
     /// Launch the given task on the specified executor
     pub(crate) async fn launch_task(
@@ -544,8 +546,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         Ok(())
     }
 
-    /// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
+    #[allow(dead_code)]
     #[cfg(test)]
+    /// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
     pub(crate) async fn launch_task(
         &self,
         _executor: &ExecutorMetadata,
@@ -623,6 +626,114 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         }
     }
 
+    #[cfg(not(test))]
+    /// Launch the given tasks on the specified executor
+    pub(crate) async fn launch_multi_task(
+        &self,
+        executor: &ExecutorMetadata,
+        tasks: Vec<Vec<TaskDescription>>,
+        executor_manager: &ExecutorManager,
+    ) -> Result<()> {
+        info!("Launching multi task on executor {:?}", executor.id);
+        let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
+            .into_iter()
+            .map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
+            .collect();
+        let multi_tasks = multi_tasks?;
+        let mut client = executor_manager.get_client(&executor.id).await?;
+        client
+            .launch_multi_task(protobuf::LaunchMultiTaskParams {
+                multi_tasks,
+                scheduler_id: self.scheduler_id.clone(),
+            })
+            .await
+            .map_err(|e| {
+                BallistaError::Internal(format!(
+                    "Failed to connect to executor {}: {:?}",
+                    executor.id, e
+                ))
+            })?;
+        Ok(())
+    }
+
+    #[cfg(test)]
+    /// Launch the given tasks on the specified executor
+    pub(crate) async fn launch_multi_task(
+        &self,
+        _executor: &ExecutorMetadata,
+        _tasks: Vec<Vec<TaskDescription>>,
+        _executor_manager: &ExecutorManager,
+    ) -> Result<()> {
+        Ok(())
+    }
+
+    #[allow(dead_code)]
+    pub fn prepare_multi_task_definition(
+        &self,
+        tasks: Vec<TaskDescription>,
+    ) -> Result<MultiTaskDefinition> {
+        debug!("Preparing multi task definition for {:?}", tasks);
+        if let Some(task) = tasks.get(0) {
+            let session_id = task.session_id.clone();
+            let job_id = task.partition.job_id.clone();
+            let stage_id = task.partition.stage_id;
+            let stage_attempt_num = task.stage_attempt_num;
+
+            if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
+                let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
+                {
+                    plan.clone()
+                } else {
+                    let mut plan_buf: Vec<u8> = vec![];
+                    let plan_proto = U::try_from_physical_plan(
+                        task.plan.clone(),
+                        self.codec.physical_extension_codec(),
+                    )?;
+                    plan_proto.try_encode(&mut plan_buf)?;
+
+                    job_info
+                        .encoded_stage_plans
+                        .insert(stage_id, plan_buf.clone());
+
+                    plan_buf
+                };
+                let output_partitioning =
+                    hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
+
+                let task_ids = tasks
+                    .iter()
+                    .map(|task| TaskId {
+                        task_id: task.task_id as u32,
+                        task_attempt_num: task.task_attempt as u32,
+                        partition_id: task.partition.partition_id as u32,
+                    })
+                    .collect();
+
+                let multi_task_definition = MultiTaskDefinition {
+                    task_ids,
+                    job_id,
+                    stage_id: stage_id as u32,
+                    stage_attempt_num: stage_attempt_num as u32,
+                    plan,
+                    output_partitioning,
+                    session_id,
+                    launch_time: SystemTime::now()
+                        .duration_since(UNIX_EPOCH)
+                        .unwrap()
+                        .as_millis() as u64,
+                    props: vec![],
+                };
+                Ok(multi_task_definition)
+            } else {
+                Err(BallistaError::General(format!("Cannot prepare multi task definition for job {} which is not in active cache", job_id)))
+            }
+        } else {
+            Err(BallistaError::General(
+                "Cannot prepare multi task definition for an empty vec".to_string(),
+            ))
+        }
+    }
+
     /// Get the `ExecutionGraph` for the given job ID from cache
     pub(crate) async fn get_active_execution_graph(
         &self,