You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/28 22:08:26 UTC

[arrow-ballista] branch master updated: Use another channel to update the status of a task set for executor (#104)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new d2a3a29a Use another channel to update the status of a task set for executor  (#104)
d2a3a29a is described below

commit d2a3a29a35142eeab9457e2f361b29c25a094930
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Fri Jul 29 06:08:21 2022 +0800

    Use another channel to update the status of a task set for executor  (#104)
    
    * Use another channel to update the status of a task set for executor
    
    * fix clippy
---
 ballista/rust/executor/src/executor_server.rs | 88 ++++++++++++++++++++++-----
 1 file changed, 72 insertions(+), 16 deletions(-)

diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index e782cf1f..7fa48939 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -34,13 +34,15 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
 use ballista_core::serde::protobuf::{
     HeartBeatParams, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
-    StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
+    StopExecutorParams, StopExecutorResult, TaskDefinition, TaskStatus,
+    UpdateTaskStatusParams,
 };
 use ballista_core::serde::scheduler::ExecutorState;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc::error::TryRecvError;
 
 use crate::as_task_status;
 use crate::cpu_bound_executor::DedicatedExecutor;
@@ -53,11 +55,15 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
 ) {
     // TODO make the buffer size configurable
     let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
+    let (tx_task_status, rx_task_status) = mpsc::channel::<TaskStatus>(1000);
 
     let executor_server = ExecutorServer::new(
         scheduler.clone(),
         executor.clone(),
-        ExecutorEnv { tx_task },
+        ExecutorEnv {
+            tx_task,
+            tx_task_status,
+        },
         codec,
     );
 
@@ -103,7 +109,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     // 4. Start TaskRunnerPool
     {
         let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
-        task_runner_pool.start(rx_task).await;
+        task_runner_pool.start(rx_task, rx_task_status).await;
     }
 }
 
@@ -137,7 +143,10 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPl
 
 #[derive(Clone)]
 struct ExecutorEnv {
+    /// Receive `TaskDefinition` from rpc then send to CPU bound tasks pool `dedicated_executor`.
     tx_task: mpsc::Sender<TaskDefinition>,
+    /// Receive `TaskStatus` from CPU bound tasks pool `dedicated_executor` then use rpc send back to scheduler.
+    tx_task_status: mpsc::Sender<TaskStatus>,
 }
 
 unsafe impl Sync for ExecutorEnv {}
@@ -238,19 +247,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         debug!("Statistics: {:?}", execution_result);
 
         let executor_id = &self.executor.metadata.id;
-        // TODO use another channel to update the status of a task set
-        self.scheduler
-            .clone()
-            .update_task_status(UpdateTaskStatusParams {
-                executor_id: executor_id.clone(),
-                task_status: vec![as_task_status(
-                    execution_result,
-                    executor_id.clone(),
-                    task_id,
-                )],
-            })
-            .await?;
+        let task_status = as_task_status(execution_result, executor_id.clone(), task_id);
 
+        let task_status_sender = self.executor_env.tx_task_status.clone();
+        task_status_sender.send(task_status).await.unwrap();
         Ok(())
     }
 
@@ -292,7 +292,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         Self { executor_server }
     }
 
-    async fn start(&self, mut rx_task: mpsc::Receiver<TaskDefinition>) {
+    // There are two loop(future) running separately in tokio runtime.
+    // First is for sending back task status to scheduler
+    // Second is for receiving task from scheduler and run
+    async fn start(
+        &self,
+        mut rx_task: mpsc::Receiver<TaskDefinition>,
+        mut rx_task_status: mpsc::Receiver<TaskStatus>,
+    ) {
+        //1. loop for task status reporting
+        let executor_server = self.executor_server.clone();
+        tokio::spawn(async move {
+            info!("Starting the task status reporter");
+            loop {
+                let mut tasks_status = vec![];
+                // First try to fetch task status from the channel in *blocking* mode
+                if let Some(task_status) = rx_task_status.recv().await {
+                    tasks_status.push(task_status);
+                } else {
+                    info!("Channel is closed and will exit the loop");
+                    return;
+                }
+
+                // Then try to fetch by non-blocking mode to fetch as much finished tasks as possible
+                loop {
+                    match rx_task_status.try_recv() {
+                        Ok(task_sta) => {
+                            tasks_status.push(task_sta);
+                        }
+                        Err(TryRecvError::Empty) => {
+                            info!(
+                                "Fetched {} tasks status to report",
+                                tasks_status.len()
+                            );
+                            break;
+                        }
+                        Err(TryRecvError::Disconnected) => {
+                            info!("Channel is closed and will exit the loop");
+                            return;
+                        }
+                    }
+                }
+
+                if let Err(e) = executor_server
+                    .scheduler
+                    .clone()
+                    .update_task_status(UpdateTaskStatusParams {
+                        executor_id: executor_server.executor.metadata.id.clone(),
+                        task_status: tasks_status.clone(),
+                    })
+                    .await
+                {
+                    error!("Fail to update tasks {:?} due to {:?}", tasks_status, e);
+                }
+            }
+        });
+
+        //2. loop for task fetching and running
         let executor_server = self.executor_server.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");