You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/12 19:42:11 UTC

[arrow-ballista] branch only_decode_once updated: Only decode plan once

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch only_decode_once
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/only_decode_once by this push:
     new 28f60298 Only decode plan once
28f60298 is described below

commit 28f60298e9f93112df73bf95c5069bc43d42b3c0
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Wed Apr 12 21:42:03 2023 +0200

    Only decode plan once
---
 ballista/executor/src/executor_server.rs | 141 ++++++++++++++++++++++---------
 1 file changed, 99 insertions(+), 42 deletions(-)

diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index 89f2eef5..ab6248b6 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -67,7 +67,7 @@ type SchedulerClients = Arc<DashMap<String, SchedulerGrpcClient<Channel>>>;
 #[derive(Debug)]
 struct CuratorTaskDefinition {
     scheduler_id: String,
-    task: TaskDefinition,
+    tasks: Vec<TaskDefinition>,
 }
 
 /// Wrap TaskStatus with its curator scheduler id for task update to its specific curator scheduler later
@@ -296,17 +296,57 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         }
     }
 
-    async fn run_task(
+    async fn decode_task(
         &self,
         task_identity: String,
-        curator_task: CuratorTaskDefinition,
+        curator_task: TaskDefinition,
+    ) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
+        let runtime = self.executor.runtime.clone();
+        let task = curator_task;
+        let task_props = task.props.clone();
+        let mut task_scalar_functions = HashMap::new();
+        let mut task_aggregate_functions = HashMap::new();
+        for scalar_func in self.executor.scalar_functions.clone() {
+            task_scalar_functions.insert(scalar_func.0, scalar_func.1);
+        }
+        for agg_func in self.executor.aggregate_functions.clone() {
+            task_aggregate_functions.insert(agg_func.0, agg_func.1);
+        }
+
+        let task_context = Arc::new(TaskContext::new(
+            task_identity.clone(),
+            task.session_id.clone(),
+            task_props,
+            task_scalar_functions,
+            task_aggregate_functions,
+            runtime.clone(),
+        ));
+        let encoded_plan = &task.plan.as_slice();
+
+        Ok(U::try_decode(encoded_plan)
+            .and_then(|proto| {
+                proto.try_into_physical_plan(
+                    task_context.deref(),
+                    runtime.deref(),
+                    self.codec.physical_extension_codec(),
+                )
+            })?
+            .clone())
+    }
+
+    async fn run_task(
+        &self,
+        task_identity: &str,
+        scheduler_id: String,
+        curator_task: TaskDefinition,
+        plan: Arc<dyn ExecutionPlan>,
     ) -> Result<(), BallistaError> {
         let start_exec_time = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .unwrap()
             .as_millis() as u64;
         info!("Start to run task {}", task_identity);
-        let task = curator_task.task;
+        let task = curator_task;
         let task_props = task.props;
 
         let mut task_scalar_functions = HashMap::new();
@@ -322,7 +362,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         let session_id = task.session_id;
         let runtime = self.executor.runtime.clone();
         let task_context = Arc::new(TaskContext::new(
-            task_identity.clone(),
+            task_identity.to_string(),
             session_id,
             task_props,
             task_scalar_functions,
@@ -330,17 +370,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
             runtime.clone(),
         ));
 
-        let encoded_plan = &task.plan.as_slice();
-
-        let plan: Arc<dyn ExecutionPlan> =
-            U::try_decode(encoded_plan).and_then(|proto| {
-                proto.try_into_physical_plan(
-                    task_context.deref(),
-                    runtime.deref(),
-                    self.codec.physical_extension_codec(),
-                )
-            })?;
-
         let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
             task.output_partitioning.as_ref(),
             task_context.as_ref(),
@@ -404,7 +433,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
             task_execution_times,
         );
 
-        let scheduler_id = curator_task.scheduler_id;
         let task_status_sender = self.executor_env.tx_task_status.clone();
         task_status_sender
             .send(CuratorTaskStatus {
@@ -587,30 +615,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
                         return;
                     }
                 };
-                if let Some(curator_task) = maybe_task {
-                    let task_identity = format!(
-                        "TID {} {}/{}.{}/{}.{}",
-                        &curator_task.task.task_id,
-                        &curator_task.task.job_id,
-                        &curator_task.task.stage_id,
-                        &curator_task.task.stage_attempt_num,
-                        &curator_task.task.partition_id,
-                        &curator_task.task.task_attempt_num,
-                    );
-                    info!("Received task {:?}", &task_identity);
-
+                if let Some(task) = maybe_task {
                     let server = executor_server.clone();
-                    dedicated_executor.spawn(async move {
-                        server
-                            .run_task(task_identity.clone(), curator_task)
-                            .await
-                            .unwrap_or_else(|e| {
-                                error!(
-                                    "Fail to run the task {:?} due to {:?}",
-                                    task_identity, e
-                                );
-                            });
+                    let first_task = task.tasks.get(0).unwrap().clone();
+                    let out = dedicated_executor.spawn(async move {
+                        let task_identity = format!(
+                            "TID {} {}/{}.{}/{}.{}",
+                            &first_task.task_id,
+                            &first_task.job_id,
+                            &first_task.stage_id,
+                            &first_task.stage_attempt_num,
+                            &first_task.partition_id,
+                            &first_task.task_attempt_num,
+                        );
+
+                        server.decode_task(task_identity, first_task).await
                     });
+
+                    let plan = out.await.unwrap().unwrap();
+                    let scheduler_id = task.scheduler_id.clone();
+
+                    for curator_task in task.tasks {
+                        let plan = plan.clone();
+                        let scheduler_id = scheduler_id.clone();
+
+                        let task_identity = format!(
+                            "TID {} {}/{}.{}/{}.{}",
+                            &curator_task.task_id,
+                            &curator_task.job_id,
+                            &curator_task.stage_id,
+                            &curator_task.stage_attempt_num,
+                            &curator_task.partition_id,
+                            &curator_task.task_attempt_num,
+                        );
+                        info!("Received task {:?}", &task_identity);
+
+                        let server = executor_server.clone();
+                        dedicated_executor.spawn(async move {
+                            server
+                                .run_task(
+                                    &task_identity,
+                                    scheduler_id,
+                                    curator_task,
+                                    plan,
+                                )
+                                .await
+                                .unwrap_or_else(|e| {
+                                    error!(
+                                        "Fail to run the task {:?} due to {:?}",
+                                        task_identity, e
+                                    );
+                                });
+                        });
+                    }
                 } else {
                     info!("Channel is closed and will exit the task receive loop");
                     drop(task_runner_complete);
@@ -638,9 +695,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
             task_sender
                 .send(CuratorTaskDefinition {
                     scheduler_id: scheduler_id.clone(),
-                    task: task
+                    tasks: vec![task
                         .try_into()
-                        .map_err(|e| Status::invalid_argument(format!("{e}")))?,
+                        .map_err(|e| Status::invalid_argument(format!("{e}")))?],
                 })
                 .await
                 .unwrap();
@@ -667,7 +724,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
                 task_sender
                     .send(CuratorTaskDefinition {
                         scheduler_id: scheduler_id.clone(),
-                        task,
+                        tasks: vec![task],
                     })
                     .await
                     .unwrap();