You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/12 19:42:11 UTC
[arrow-ballista] branch only_decode_once updated: Only decode plan once
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch only_decode_once
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/only_decode_once by this push:
new 28f60298 Only decode plan once
28f60298 is described below
commit 28f60298e9f93112df73bf95c5069bc43d42b3c0
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Wed Apr 12 21:42:03 2023 +0200
Only decode plan once
---
ballista/executor/src/executor_server.rs | 141 ++++++++++++++++++++++---------
1 file changed, 99 insertions(+), 42 deletions(-)
diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index 89f2eef5..ab6248b6 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -67,7 +67,7 @@ type SchedulerClients = Arc<DashMap<String, SchedulerGrpcClient<Channel>>>;
#[derive(Debug)]
struct CuratorTaskDefinition {
scheduler_id: String,
- task: TaskDefinition,
+ tasks: Vec<TaskDefinition>,
}
/// Wrap TaskStatus with its curator scheduler id for task update to its specific curator scheduler later
@@ -296,17 +296,57 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
}
}
- async fn run_task(
+ async fn decode_task(
&self,
task_identity: String,
- curator_task: CuratorTaskDefinition,
+ curator_task: TaskDefinition,
+ ) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
+ let runtime = self.executor.runtime.clone();
+ let task = curator_task;
+ let task_props = task.props.clone();
+ let mut task_scalar_functions = HashMap::new();
+ let mut task_aggregate_functions = HashMap::new();
+ for scalar_func in self.executor.scalar_functions.clone() {
+ task_scalar_functions.insert(scalar_func.0, scalar_func.1);
+ }
+ for agg_func in self.executor.aggregate_functions.clone() {
+ task_aggregate_functions.insert(agg_func.0, agg_func.1);
+ }
+
+ let task_context = Arc::new(TaskContext::new(
+ task_identity.clone(),
+ task.session_id.clone(),
+ task_props,
+ task_scalar_functions,
+ task_aggregate_functions,
+ runtime.clone(),
+ ));
+ let encoded_plan = &task.plan.as_slice();
+
+ Ok(U::try_decode(encoded_plan)
+ .and_then(|proto| {
+ proto.try_into_physical_plan(
+ task_context.deref(),
+ runtime.deref(),
+ self.codec.physical_extension_codec(),
+ )
+ })?
+ .clone())
+ }
+
+ async fn run_task(
+ &self,
+ task_identity: &str,
+ scheduler_id: String,
+ curator_task: TaskDefinition,
+ plan: Arc<dyn ExecutionPlan>,
) -> Result<(), BallistaError> {
let start_exec_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
info!("Start to run task {}", task_identity);
- let task = curator_task.task;
+ let task = curator_task;
let task_props = task.props;
let mut task_scalar_functions = HashMap::new();
@@ -322,7 +362,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let session_id = task.session_id;
let runtime = self.executor.runtime.clone();
let task_context = Arc::new(TaskContext::new(
- task_identity.clone(),
+ task_identity.to_string(),
session_id,
task_props,
task_scalar_functions,
@@ -330,17 +370,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
runtime.clone(),
));
- let encoded_plan = &task.plan.as_slice();
-
- let plan: Arc<dyn ExecutionPlan> =
- U::try_decode(encoded_plan).and_then(|proto| {
- proto.try_into_physical_plan(
- task_context.deref(),
- runtime.deref(),
- self.codec.physical_extension_codec(),
- )
- })?;
-
let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
task.output_partitioning.as_ref(),
task_context.as_ref(),
@@ -404,7 +433,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
task_execution_times,
);
- let scheduler_id = curator_task.scheduler_id;
let task_status_sender = self.executor_env.tx_task_status.clone();
task_status_sender
.send(CuratorTaskStatus {
@@ -587,30 +615,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
return;
}
};
- if let Some(curator_task) = maybe_task {
- let task_identity = format!(
- "TID {} {}/{}.{}/{}.{}",
- &curator_task.task.task_id,
- &curator_task.task.job_id,
- &curator_task.task.stage_id,
- &curator_task.task.stage_attempt_num,
- &curator_task.task.partition_id,
- &curator_task.task.task_attempt_num,
- );
- info!("Received task {:?}", &task_identity);
-
+ if let Some(task) = maybe_task {
let server = executor_server.clone();
- dedicated_executor.spawn(async move {
- server
- .run_task(task_identity.clone(), curator_task)
- .await
- .unwrap_or_else(|e| {
- error!(
- "Fail to run the task {:?} due to {:?}",
- task_identity, e
- );
- });
+ let first_task = task.tasks.get(0).unwrap().clone();
+ let out = dedicated_executor.spawn(async move {
+ let task_identity = format!(
+ "TID {} {}/{}.{}/{}.{}",
+ &first_task.task_id,
+ &first_task.job_id,
+ &first_task.stage_id,
+ &first_task.stage_attempt_num,
+ &first_task.partition_id,
+ &first_task.task_attempt_num,
+ );
+
+ server.decode_task(task_identity, first_task).await
});
+
+ let plan = out.await.unwrap().unwrap();
+ let scheduler_id = task.scheduler_id.clone();
+
+ for curator_task in task.tasks {
+ let plan = plan.clone();
+ let scheduler_id = scheduler_id.clone();
+
+ let task_identity = format!(
+ "TID {} {}/{}.{}/{}.{}",
+ &curator_task.task_id,
+ &curator_task.job_id,
+ &curator_task.stage_id,
+ &curator_task.stage_attempt_num,
+ &curator_task.partition_id,
+ &curator_task.task_attempt_num,
+ );
+ info!("Received task {:?}", &task_identity);
+
+ let server = executor_server.clone();
+ dedicated_executor.spawn(async move {
+ server
+ .run_task(
+ &task_identity,
+ scheduler_id,
+ curator_task,
+ plan,
+ )
+ .await
+ .unwrap_or_else(|e| {
+ error!(
+ "Fail to run the task {:?} due to {:?}",
+ task_identity, e
+ );
+ });
+ });
+ }
} else {
info!("Channel is closed and will exit the task receive loop");
drop(task_runner_complete);
@@ -638,9 +695,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
task_sender
.send(CuratorTaskDefinition {
scheduler_id: scheduler_id.clone(),
- task: task
+ tasks: vec![task
.try_into()
- .map_err(|e| Status::invalid_argument(format!("{e}")))?,
+ .map_err(|e| Status::invalid_argument(format!("{e}")))?],
})
.await
.unwrap();
@@ -667,7 +724,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
task_sender
.send(CuratorTaskDefinition {
scheduler_id: scheduler_id.clone(),
- task,
+ tasks: vec![task],
})
.await
.unwrap();