You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/21 00:59:33 UTC
[arrow-ballista] branch master updated: Improve launch task efficiency by calling LaunchMultiTask (#394)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 93df7446 Improve launch task efficiency by calling LaunchMultiTask (#394)
93df7446 is described below
commit 93df7446aeacd8f47877f0daa6e2f31f2eefa719
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Oct 21 08:59:27 2022 +0800
Improve launch task efficiency by calling LaunchMultiTask (#394)
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/scheduler/src/state/mod.rs | 57 +++++++++++--
ballista/scheduler/src/state/task_manager.rs | 115 ++++++++++++++++++++++++++-
2 files changed, 164 insertions(+), 8 deletions(-)
diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs
index 57fdc3c0..3de58a0b 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -16,6 +16,7 @@
// under the License.
use std::any::type_name;
+use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Instant;
@@ -27,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::TaskManager;
+use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
@@ -196,7 +198,39 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
.await
{
Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
+ // Put tasks to the same executor together
+ // And put tasks belonging to the same stage together for creating MultiTaskDefinition
+ let mut executor_stage_assignments: HashMap<
+ String,
+ HashMap<(String, usize), Vec<TaskDescription>>,
+ > = HashMap::new();
for (executor_id, task) in assignments.into_iter() {
+ let stage_key =
+ (task.partition.job_id.clone(), task.partition.stage_id);
+ if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
+ {
+ if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
+ executor_stage_tasks.push(task);
+ } else {
+ tasks.insert(stage_key, vec![task]);
+ }
+ } else {
+ let mut executor_stage_tasks: HashMap<
+ (String, usize),
+ Vec<TaskDescription>,
+ > = HashMap::new();
+ executor_stage_tasks.insert(stage_key, vec![task]);
+ executor_stage_assignments
+ .insert(executor_id, executor_stage_tasks);
+ }
+ }
+
+ for (executor_id, tasks) in executor_stage_assignments.into_iter() {
+ let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
+ // Total number of tasks to be launched for one executor
+ let n_tasks: usize =
+ tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
+
match self
.executor_manager
.get_executor_metadata(&executor_id)
@@ -205,19 +239,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
Ok(executor) => {
if let Err(e) = self
.task_manager
- .launch_task(&executor, task, &self.executor_manager)
+ .launch_multi_task(
+ &executor,
+ tasks,
+ &self.executor_manager,
+ )
.await
{
error!("Failed to launch new task: {:?}", e);
- unassigned_reservations.push(
- ExecutorReservation::new_free(executor_id.clone()),
- );
+ for _i in 0..n_tasks {
+ unassigned_reservations.push(
+ ExecutorReservation::new_free(
+ executor_id.clone(),
+ ),
+ );
+ }
}
}
Err(e) => {
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
- unassigned_reservations
- .push(ExecutorReservation::new_free(executor_id.clone()));
+ for _i in 0..n_tasks {
+ unassigned_reservations.push(
+ ExecutorReservation::new_free(executor_id.clone()),
+ );
+ }
}
}
}
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index 841c5421..fed7cce3 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,7 +29,8 @@ use ballista_core::error::Result;
use crate::state::session_manager::create_datafusion_context;
use ballista_core::serde::protobuf::{
- self, job_status, FailedJob, JobStatus, TaskDefinition, TaskStatus,
+ self, job_status, FailedJob, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
+ TaskStatus,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::ExecutorMetadata;
@@ -518,6 +519,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
.await
}
+ #[allow(dead_code)]
#[cfg(not(test))]
/// Launch the given task on the specified executor
pub(crate) async fn launch_task(
@@ -544,8 +546,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
Ok(())
}
- /// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
+ #[allow(dead_code)]
#[cfg(test)]
+ /// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
pub(crate) async fn launch_task(
&self,
_executor: &ExecutorMetadata,
@@ -623,6 +626,114 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
}
}
+ #[cfg(not(test))]
+ /// Launch the given tasks on the specified executor
+ pub(crate) async fn launch_multi_task(
+ &self,
+ executor: &ExecutorMetadata,
+ tasks: Vec<Vec<TaskDescription>>,
+ executor_manager: &ExecutorManager,
+ ) -> Result<()> {
+ info!("Launching multi task on executor {:?}", executor.id);
+ let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
+ .into_iter()
+ .map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
+ .collect();
+ let multi_tasks = multi_tasks?;
+ let mut client = executor_manager.get_client(&executor.id).await?;
+ client
+ .launch_multi_task(protobuf::LaunchMultiTaskParams {
+ multi_tasks,
+ scheduler_id: self.scheduler_id.clone(),
+ })
+ .await
+ .map_err(|e| {
+ BallistaError::Internal(format!(
+ "Failed to connect to executor {}: {:?}",
+ executor.id, e
+ ))
+ })?;
+ Ok(())
+ }
+
+ #[cfg(test)]
+ /// Launch the given tasks on the specified executor
+ pub(crate) async fn launch_multi_task(
+ &self,
+ _executor: &ExecutorMetadata,
+ _tasks: Vec<Vec<TaskDescription>>,
+ _executor_manager: &ExecutorManager,
+ ) -> Result<()> {
+ Ok(())
+ }
+
+ #[allow(dead_code)]
+ pub fn prepare_multi_task_definition(
+ &self,
+ tasks: Vec<TaskDescription>,
+ ) -> Result<MultiTaskDefinition> {
+ debug!("Preparing multi task definition for {:?}", tasks);
+ if let Some(task) = tasks.get(0) {
+ let session_id = task.session_id.clone();
+ let job_id = task.partition.job_id.clone();
+ let stage_id = task.partition.stage_id;
+ let stage_attempt_num = task.stage_attempt_num;
+
+ if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
+ let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
+ {
+ plan.clone()
+ } else {
+ let mut plan_buf: Vec<u8> = vec![];
+ let plan_proto = U::try_from_physical_plan(
+ task.plan.clone(),
+ self.codec.physical_extension_codec(),
+ )?;
+ plan_proto.try_encode(&mut plan_buf)?;
+
+ job_info
+ .encoded_stage_plans
+ .insert(stage_id, plan_buf.clone());
+
+ plan_buf
+ };
+ let output_partitioning =
+ hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
+
+ let task_ids = tasks
+ .iter()
+ .map(|task| TaskId {
+ task_id: task.task_id as u32,
+ task_attempt_num: task.task_attempt as u32,
+ partition_id: task.partition.partition_id as u32,
+ })
+ .collect();
+
+ let multi_task_definition = MultiTaskDefinition {
+ task_ids,
+ job_id,
+ stage_id: stage_id as u32,
+ stage_attempt_num: stage_attempt_num as u32,
+ plan,
+ output_partitioning,
+ session_id,
+ launch_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64,
+ props: vec![],
+ };
+ Ok(multi_task_definition)
+ } else {
+ Err(BallistaError::General(format!("Cannot prepare multi task definition for job {} which is not in active cache", job_id)))
+ }
+ } else {
+ Err(BallistaError::General(
+ "Cannot prepare multi task definition for an empty vec".to_string(),
+ ))
+ }
+ }
+
/// Get the `ExecutionGraph` for the given job ID from cache
pub(crate) async fn get_active_execution_graph(
&self,