You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/03 14:01:25 UTC
[arrow-ballista] branch master updated: Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 0cddc1d5 Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)
0cddc1d5 is described below
commit 0cddc1d569d2cc1fdaefcdefd19a6a4ae26142ef
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Nov 3 22:01:19 2022 +0800
Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/core/proto/ballista.proto | 9 ++++++
ballista/scheduler/src/scheduler_server/event.rs | 1 +
ballista/scheduler/src/scheduler_server/grpc.rs | 36 ++++++++++++++++++----
.../src/scheduler_server/query_stage_scheduler.rs | 3 ++
4 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto
index b5992f03..a38bf0ad 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -948,6 +948,13 @@ message CancelJobResult {
bool cancelled = 1;
}
+message CleanJobDataParams {
+ string job_id = 1;
+}
+
+message CleanJobDataResult {
+}
+
message LaunchTaskParams {
// Allow to launch a task set to an executor at once
repeated TaskDefinition tasks = 1;
@@ -1014,6 +1021,8 @@ service SchedulerGrpc {
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
rpc CancelJob (CancelJobParams) returns (CancelJobResult) {}
+
+ rpc CleanJobData (CleanJobDataParams) returns (CleanJobDataResult) {}
}
service ExecutorGrpc {
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index 544976cd..cb1dd8e9 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -40,6 +40,7 @@ pub enum QueryStageSchedulerEvent {
JobRunningFailed(String, String),
JobUpdated(String),
JobCancel(String),
+ JobDataClean(String),
TaskUpdating(String, Vec<TaskStatus>),
ReservationOffering(Vec<ExecutorReservation>),
ExecutorLost(String, Option<String>),
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 73a1c64f..42805672 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -22,12 +22,12 @@ use std::convert::TryInto;
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
- executor_status, CancelJobParams, CancelJobResult, ExecuteQueryParams,
- ExecuteQueryResult, ExecutorHeartbeat, ExecutorStatus, ExecutorStoppedParams,
- ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
- GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
- PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
- UpdateTaskStatusParams, UpdateTaskStatusResult,
+ executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
+ CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
+ ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
+ GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+ HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+ RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::serde::AsExecutionPlan;
@@ -543,6 +543,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
})?;
Ok(Response::new(CancelJobResult { cancelled: true }))
}
+
+ async fn clean_job_data(
+ &self,
+ request: Request<CleanJobDataParams>,
+ ) -> Result<Response<CleanJobDataResult>, Status> {
+ let job_id = request.into_inner().job_id;
+ info!("Received clean data request for job {}", job_id);
+
+ self.query_stage_event_loop
+ .get_sender()
+ .map_err(|e| {
+ let msg = format!("Get query stage event loop error due to {:?}", e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?
+ .post_event(QueryStageSchedulerEvent::JobDataClean(job_id))
+ .await
+ .map_err(|e| {
+ let msg = format!("Post to query stage event loop error due to {:?}", e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
+ Ok(Response::new(CleanJobDataResult {}))
+ }
}
#[cfg(all(test, feature = "sled"))]
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 5c31bdaa..f49cae46 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -219,6 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.cancel_running_tasks(tasks)
.await?
}
+ QueryStageSchedulerEvent::JobDataClean(job_id) => {
+ self.state.executor_manager.clean_up_job_data(job_id);
+ }
}
Ok(())