You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/28 22:08:26 UTC
[arrow-ballista] branch master updated: Use another channel to update the status of a task set for executor (#104)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new d2a3a29a Use another channel to update the status of a task set for executor (#104)
d2a3a29a is described below
commit d2a3a29a35142eeab9457e2f361b29c25a094930
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Fri Jul 29 06:08:21 2022 +0800
Use another channel to update the status of a task set for executor (#104)
* Use another channel to update the status of a task set for executor
* fix clippy
---
ballista/rust/executor/src/executor_server.rs | 88 ++++++++++++++++++++++-----
1 file changed, 72 insertions(+), 16 deletions(-)
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index e782cf1f..7fa48939 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -34,13 +34,15 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{
HeartBeatParams, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
- StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
+ StopExecutorParams, StopExecutorResult, TaskDefinition, TaskStatus,
+ UpdateTaskStatusParams,
};
use ballista_core::serde::scheduler::ExecutorState;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc::error::TryRecvError;
use crate::as_task_status;
use crate::cpu_bound_executor::DedicatedExecutor;
@@ -53,11 +55,15 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
) {
// TODO make the buffer size configurable
let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
+ let (tx_task_status, rx_task_status) = mpsc::channel::<TaskStatus>(1000);
let executor_server = ExecutorServer::new(
scheduler.clone(),
executor.clone(),
- ExecutorEnv { tx_task },
+ ExecutorEnv {
+ tx_task,
+ tx_task_status,
+ },
codec,
);
@@ -103,7 +109,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
// 4. Start TaskRunnerPool
{
let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
- task_runner_pool.start(rx_task).await;
+ task_runner_pool.start(rx_task, rx_task_status).await;
}
}
@@ -137,7 +143,10 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPl
#[derive(Clone)]
struct ExecutorEnv {
+ /// Receive `TaskDefinition` from rpc then send to CPU bound tasks pool `dedicated_executor`.
tx_task: mpsc::Sender<TaskDefinition>,
+ /// Receive `TaskStatus` from CPU bound tasks pool `dedicated_executor` then use rpc send back to scheduler.
+ tx_task_status: mpsc::Sender<TaskStatus>,
}
unsafe impl Sync for ExecutorEnv {}
@@ -238,19 +247,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
debug!("Statistics: {:?}", execution_result);
let executor_id = &self.executor.metadata.id;
- // TODO use another channel to update the status of a task set
- self.scheduler
- .clone()
- .update_task_status(UpdateTaskStatusParams {
- executor_id: executor_id.clone(),
- task_status: vec![as_task_status(
- execution_result,
- executor_id.clone(),
- task_id,
- )],
- })
- .await?;
+ let task_status = as_task_status(execution_result, executor_id.clone(), task_id);
+ let task_status_sender = self.executor_env.tx_task_status.clone();
+ task_status_sender.send(task_status).await.unwrap();
Ok(())
}
@@ -292,7 +292,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
Self { executor_server }
}
- async fn start(&self, mut rx_task: mpsc::Receiver<TaskDefinition>) {
+ // There are two loop(future) running separately in tokio runtime.
+ // First is for sending back task status to scheduler
+ // Second is for receiving task from scheduler and run
+ async fn start(
+ &self,
+ mut rx_task: mpsc::Receiver<TaskDefinition>,
+ mut rx_task_status: mpsc::Receiver<TaskStatus>,
+ ) {
+ //1. loop for task status reporting
+ let executor_server = self.executor_server.clone();
+ tokio::spawn(async move {
+ info!("Starting the task status reporter");
+ loop {
+ let mut tasks_status = vec![];
+ // First try to fetch task status from the channel in *blocking* mode
+ if let Some(task_status) = rx_task_status.recv().await {
+ tasks_status.push(task_status);
+ } else {
+ info!("Channel is closed and will exit the loop");
+ return;
+ }
+
+ // Then try to fetch by non-blocking mode to fetch as much finished tasks as possible
+ loop {
+ match rx_task_status.try_recv() {
+ Ok(task_sta) => {
+ tasks_status.push(task_sta);
+ }
+ Err(TryRecvError::Empty) => {
+ info!(
+ "Fetched {} tasks status to report",
+ tasks_status.len()
+ );
+ break;
+ }
+ Err(TryRecvError::Disconnected) => {
+ info!("Channel is closed and will exit the loop");
+ return;
+ }
+ }
+ }
+
+ if let Err(e) = executor_server
+ .scheduler
+ .clone()
+ .update_task_status(UpdateTaskStatusParams {
+ executor_id: executor_server.executor.metadata.id.clone(),
+ task_status: tasks_status.clone(),
+ })
+ .await
+ {
+ error!("Fail to update tasks {:?} due to {:?}", tasks_status, e);
+ }
+ }
+ });
+
+ //2. loop for task fetching and running
let executor_server = self.executor_server.clone();
tokio::spawn(async move {
info!("Starting the task runner pool");