You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/03 14:01:25 UTC

[arrow-ballista] branch master updated: Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cddc1d5 Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)
0cddc1d5 is described below

commit 0cddc1d569d2cc1fdaefcdefd19a6a4ae26142ef
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Nov 3 22:01:19 2022 +0800

    Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly (#485)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/core/proto/ballista.proto                 |  9 ++++++
 ballista/scheduler/src/scheduler_server/event.rs   |  1 +
 ballista/scheduler/src/scheduler_server/grpc.rs    | 36 ++++++++++++++++++----
 .../src/scheduler_server/query_stage_scheduler.rs  |  3 ++
 4 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto
index b5992f03..a38bf0ad 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -948,6 +948,13 @@ message CancelJobResult {
   bool cancelled = 1;
 }
 
+message CleanJobDataParams {
+  string job_id = 1;
+}
+
+message CleanJobDataResult {
+}
+
 message LaunchTaskParams {
   // Allow to launch a task set to an executor at once
   repeated TaskDefinition tasks = 1;
@@ -1014,6 +1021,8 @@ service SchedulerGrpc {
   rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
 
   rpc CancelJob (CancelJobParams) returns (CancelJobResult) {}
+
+  rpc CleanJobData (CleanJobDataParams) returns (CleanJobDataResult) {}
 }
 
 service ExecutorGrpc {
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index 544976cd..cb1dd8e9 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -40,6 +40,7 @@ pub enum QueryStageSchedulerEvent {
     JobRunningFailed(String, String),
     JobUpdated(String),
     JobCancel(String),
+    JobDataClean(String),
     TaskUpdating(String, Vec<TaskStatus>),
     ReservationOffering(Vec<ExecutorReservation>),
     ExecutorLost(String, Option<String>),
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 73a1c64f..42805672 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -22,12 +22,12 @@ use std::convert::TryInto;
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelJobParams, CancelJobResult, ExecuteQueryParams,
-    ExecuteQueryResult, ExecutorHeartbeat, ExecutorStatus, ExecutorStoppedParams,
-    ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
-    GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
-    PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
-    UpdateTaskStatusParams, UpdateTaskStatusResult,
+    executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
+    CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
+    ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
+    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+    HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+    RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::serde::AsExecutionPlan;
@@ -543,6 +543,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             })?;
         Ok(Response::new(CancelJobResult { cancelled: true }))
     }
+
+    async fn clean_job_data(
+        &self,
+        request: Request<CleanJobDataParams>,
+    ) -> Result<Response<CleanJobDataResult>, Status> {
+        let job_id = request.into_inner().job_id;
+        info!("Received clean data request for job {}", job_id);
+
+        self.query_stage_event_loop
+            .get_sender()
+            .map_err(|e| {
+                let msg = format!("Get query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?
+            .post_event(QueryStageSchedulerEvent::JobDataClean(job_id))
+            .await
+            .map_err(|e| {
+                let msg = format!("Post to query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?;
+        Ok(Response::new(CleanJobDataResult {}))
+    }
 }
 
 #[cfg(all(test, feature = "sled"))]
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 5c31bdaa..f49cae46 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -219,6 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     .cancel_running_tasks(tasks)
                     .await?
             }
+            QueryStageSchedulerEvent::JobDataClean(job_id) => {
+                self.state.executor_manager.clean_up_job_data(job_id);
+            }
         }
 
         Ok(())