You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/12 17:48:58 UTC

[arrow-ballista] branch master updated: Clean up job data on both Scheduler and Executor (#188)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new e42a6c9c Clean up job data on both Scheduler and Executor (#188)
e42a6c9c is described below

commit e42a6c9c2de4530adb22bd37a9bea17d60f2582f
Author: mingmwang <mi...@ebay.com>
AuthorDate: Thu Oct 13 01:48:53 2022 +0800

    Clean up job data on both Scheduler and Executor (#188)
---
 ballista/rust/core/proto/ballista.proto            |  9 +++
 ballista/rust/executor/src/executor_server.rs      | 19 ++++-
 .../rust/scheduler/src/scheduler_server/event.rs   |  1 +
 .../rust/scheduler/src/scheduler_server/grpc.rs    | 23 +++++-
 .../src/scheduler_server/query_stage_scheduler.rs  | 48 +++++++++++-
 ballista/rust/scheduler/src/state/backend/mod.rs   |  2 +-
 .../rust/scheduler/src/state/executor_manager.rs   | 30 +++++++-
 ballista/rust/scheduler/src/state/mod.rs           |  2 +-
 ballista/rust/scheduler/src/state/task_manager.rs  | 85 +++++++++++++++++++---
 9 files changed, 198 insertions(+), 21 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 9cb8cca4..67d0be57 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -975,6 +975,13 @@ message CancelTasksResult {
   bool cancelled = 1;
 }
 
+message RemoveJobDataParams {
+  string job_id = 1;
+}
+
+message RemoveJobDataResult {
+}
+
 message RunningTaskInfo {
   uint32 task_id = 1;
   string job_id = 2;
@@ -1014,4 +1021,6 @@ service ExecutorGrpc {
   rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {}
 
   rpc CancelTasks (CancelTasksParams) returns (CancelTasksResult) {}
+
+  rpc RemoveJobData (RemoveJobDataParams) returns (RemoveJobDataResult) {}
 }
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index e850c895..acf5c873 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -19,6 +19,7 @@ use ballista_core::BALLISTA_VERSION;
 use std::collections::HashMap;
 use std::convert::TryInto;
 use std::ops::Deref;
+use std::path::PathBuf;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
@@ -37,7 +38,8 @@ use ballista_core::serde::protobuf::{
     executor_metric, executor_status, CancelTasksParams, CancelTasksResult,
     ExecutorMetric, ExecutorStatus, HeartBeatParams, LaunchMultiTaskParams,
     LaunchMultiTaskResult, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
-    StopExecutorParams, StopExecutorResult, TaskStatus, UpdateTaskStatusParams,
+    RemoveJobDataParams, RemoveJobDataResult, StopExecutorParams, StopExecutorResult,
+    TaskStatus, UpdateTaskStatusParams,
 };
 use ballista_core::serde::scheduler::PartitionId;
 use ballista_core::serde::scheduler::TaskDefinition;
@@ -710,4 +712,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
 
         Ok(Response::new(CancelTasksResult { cancelled }))
     }
+
+    async fn remove_job_data(
+        &self,
+        request: Request<RemoveJobDataParams>,
+    ) -> Result<Response<RemoveJobDataResult>, Status> {
+        let job_id = request.into_inner().job_id;
+        let work_dir = self.executor.work_dir.clone();
+        let mut path = PathBuf::from(work_dir);
+        path.push(job_id.clone());
+        if path.is_dir() {
+            info!("Remove data for job {:?}", job_id);
+            std::fs::remove_dir_all(&path)?;
+        }
+        Ok(Response::new(RemoveJobDataResult {}))
+    }
 }
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs b/ballista/rust/scheduler/src/scheduler_server/event.rs
index d01c0516..f206594f 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -39,6 +39,7 @@ pub enum QueryStageSchedulerEvent {
     // For a job fails with its execution graph setting failed
     JobRunningFailed(String, String),
     JobUpdated(String),
+    JobCancel(String),
     TaskUpdating(String, Vec<TaskStatus>),
     ReservationOffering(Vec<ExecutorReservation>),
     ExecutorLost(String, Option<String>),
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9393ed78..ddd212a1 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -42,6 +42,8 @@ use log::{debug, error, info, warn};
 
 use std::ops::Deref;
 use std::sync::Arc;
+
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
@@ -529,10 +531,23 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         request: Request<CancelJobParams>,
     ) -> Result<Response<CancelJobResult>, Status> {
         let job_id = request.into_inner().job_id;
-        match self.state.cancel_job(&job_id).await {
-            Ok(cancelled) => Ok(Response::new(CancelJobResult { cancelled })),
-            Err(e) => Err(Status::internal(e.to_string())),
-        }
+        info!("Received cancellation request for job {}", job_id);
+
+        self.query_stage_event_loop
+            .get_sender()
+            .map_err(|e| {
+                let msg = format!("Get query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?
+            .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
+            .await
+            .map_err(|e| {
+                let msg = format!("Post to query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?;
+        Ok(Response::new(CancelJobResult { cancelled: true }))
     }
 }
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 96de88a8..4f7d12a1 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
 use log::{debug, error, info};
@@ -33,6 +34,10 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::state::executor_manager::ExecutorReservation;
 use crate::state::SchedulerState;
 
+// TODO move to configuration file
+/// Clean up job data interval
+pub const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;
+
 pub(crate) struct QueryStageScheduler<
     T: 'static + AsLogicalPlan,
     U: 'static + AsExecutionPlan,
@@ -131,30 +136,67 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 error!("Job {} failed: {}", job_id, failure_reason);
                 self.state
                     .task_manager
-                    .fail_unscheduled_job(&job_id, failure_reason)
+                    .fail_unscheduled_job(
+                        &job_id,
+                        failure_reason,
+                        CLEANUP_FINISHED_JOB_DELAY_SECS,
+                    )
                     .await?;
             }
             QueryStageSchedulerEvent::JobFinished(job_id) => {
                 info!("Job {} success", job_id);
-                self.state.task_manager.succeed_job(&job_id).await?;
+                self.state
+                    .task_manager
+                    .succeed_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
+                    .await?;
+                let executor_manager = self.state.executor_manager.clone();
+                tokio::spawn(async move {
+                    tokio::time::sleep(Duration::from_secs(
+                        CLEANUP_FINISHED_JOB_DELAY_SECS,
+                    ))
+                    .await;
+                    executor_manager.clean_up_executors_data(job_id).await;
+                });
             }
             QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason) => {
                 error!("Job {} running failed", job_id);
                 let tasks = self
                     .state
                     .task_manager
-                    .abort_job(&job_id, failure_reason)
+                    .abort_job(&job_id, failure_reason, CLEANUP_FINISHED_JOB_DELAY_SECS)
                     .await?;
                 if !tasks.is_empty() {
                     tx_event
                         .post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
                         .await?;
                 }
+                let executor_manager = self.state.executor_manager.clone();
+                tokio::spawn(async move {
+                    tokio::time::sleep(Duration::from_secs(
+                        CLEANUP_FINISHED_JOB_DELAY_SECS,
+                    ))
+                    .await;
+                    executor_manager.clean_up_executors_data(job_id).await;
+                });
             }
             QueryStageSchedulerEvent::JobUpdated(job_id) => {
                 info!("Job {} Updated", job_id);
                 self.state.task_manager.update_job(&job_id).await?;
             }
+            QueryStageSchedulerEvent::JobCancel(job_id) => {
+                self.state
+                    .task_manager
+                    .cancel_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
+                    .await?;
+                let executor_manager = self.state.executor_manager.clone();
+                tokio::spawn(async move {
+                    tokio::time::sleep(Duration::from_secs(
+                        CLEANUP_FINISHED_JOB_DELAY_SECS,
+                    ))
+                    .await;
+                    executor_manager.clean_up_executors_data(job_id).await;
+                });
+            }
             QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
                 let num_status = tasks_status.len();
                 match self
diff --git a/ballista/rust/scheduler/src/state/backend/mod.rs b/ballista/rust/scheduler/src/state/backend/mod.rs
index 85c0f34d..cbba3ed1 100644
--- a/ballista/rust/scheduler/src/state/backend/mod.rs
+++ b/ballista/rust/scheduler/src/state/backend/mod.rs
@@ -49,7 +49,7 @@ impl parse_arg::ParseArgFromStr for StateBackend {
     }
 }
 
-#[derive(Debug, Eq, PartialEq, Hash)]
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub enum Keyspace {
     Executors,
     ActiveJobs,
diff --git a/ballista/rust/scheduler/src/state/executor_manager.rs b/ballista/rust/scheduler/src/state/executor_manager.rs
index 322e5a7b..70fc7a0a 100644
--- a/ballista/rust/scheduler/src/state/executor_manager.rs
+++ b/ballista/rust/scheduler/src/state/executor_manager.rs
@@ -27,12 +27,13 @@ use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
     executor_status, CancelTasksParams, ExecutorHeartbeat, ExecutorStatus,
+    RemoveJobDataParams,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::utils::create_grpc_client_connection;
 use dashmap::{DashMap, DashSet};
 use futures::StreamExt;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use tonic::transport::Channel;
@@ -258,6 +259,33 @@ impl ExecutorManager {
         Ok(())
     }
 
+    /// Send rpc to Executors to clean up the job data
+    pub async fn clean_up_executors_data(&self, job_id: String) {
+        let alive_executors = self.get_alive_executors_within_one_minute();
+        for executor in alive_executors {
+            let job_id_clone = job_id.to_owned();
+            if let Ok(mut client) = self.get_client(&executor).await {
+                tokio::spawn(async move {
+                    {
+                        if let Err(err) = client
+                            .remove_job_data(RemoveJobDataParams {
+                                job_id: job_id_clone,
+                            })
+                            .await
+                        {
+                            warn!(
+                            "Failed to call remove_job_data on Executor {} due to {:?}",
+                            executor, err
+                        )
+                        }
+                    }
+                });
+            } else {
+                warn!("Failed to get client for Executor {}", executor)
+            }
+        }
+    }
+
     pub async fn get_client(
         &self,
         executor_id: &str,
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index 8cdd74d6..57fdc3c0 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -276,7 +276,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
     pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
         info!("Received cancellation request for job {}", job_id);
 
-        match self.task_manager.cancel_job(job_id).await {
+        match self.task_manager.cancel_job(job_id, 300).await {
             Ok(tasks) => {
                 self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
                         let msg = format!("Error to cancel running tasks when cancelling job {} due to {:?}", job_id, e);
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs
index acafbcd8..ddd5ba9c 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -44,6 +44,7 @@ use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
+use std::time::Duration;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::sync::RwLock;
 type ExecutionGraphCache = Arc<DashMap<String, Arc<RwLock<ExecutionGraph>>>>;
@@ -287,7 +288,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
 
     /// Mark a job to success. This will create a key under the CompletedJobs keyspace
     /// and remove the job from ActiveJobs
-    pub async fn succeed_job(&self, job_id: &str) -> Result<()> {
+    pub(crate) async fn succeed_job(
+        &self,
+        job_id: &str,
+        clean_up_interval: u64,
+    ) -> Result<()> {
         debug!("Moving job {} from Active to Success", job_id);
         let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
         with_lock(lock, self.state.delete(Keyspace::ActiveJobs, job_id)).await?;
@@ -301,17 +306,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
                     .await?;
             } else {
                 error!("Job {} has not finished and cannot be completed", job_id);
+                return Ok(());
             }
         } else {
             warn!("Fail to find job {} in the cache", job_id);
         }
 
+        // spawn a delayed future to clean up job data on both Scheduler and Executors
+        let state = self.state.clone();
+        let job_id_str = job_id.to_owned();
+        let active_job_cache = self.active_job_cache.clone();
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
+            Self::clean_up_job_data(state, active_job_cache, false, job_id_str).await
+        });
+
         Ok(())
     }
 
     /// Cancel the job and return a Vec of running tasks need to cancel
-    pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<Vec<RunningTaskInfo>> {
-        self.abort_job(job_id, "Cancelled".to_owned()).await
+    pub(crate) async fn cancel_job(
+        &self,
+        job_id: &str,
+        clean_up_interval_in_secs: u64,
+    ) -> Result<Vec<RunningTaskInfo>> {
+        self.abort_job(job_id, "Cancelled".to_owned(), clean_up_interval_in_secs)
+            .await
     }
 
     /// Abort the job and return a Vec of running tasks need to cancel
@@ -319,6 +339,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         &self,
         job_id: &str,
         failure_reason: String,
+        clean_up_interval_in_secs: u64,
     ) -> Result<Vec<RunningTaskInfo>> {
         let locks = self
             .state
@@ -327,7 +348,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
                 (Keyspace::FailedJobs, job_id),
             ])
             .await?;
-        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+        let tasks_to_cancel = if let Some(graph) =
+            self.get_active_execution_graph(job_id).await
+        {
             let running_tasks = graph.read().await.running_tasks();
             info!(
                 "Cancelling {} running tasks for job {}",
@@ -335,22 +358,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
                 job_id
             );
             with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
-            Ok(running_tasks)
+            running_tasks
         } else {
             // TODO listen the job state update event and fix task cancelling
             warn!("Fail to find job {} in the cache, unable to cancel tasks for job, fail the job state only.", job_id);
             with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
-            Ok(vec![])
-        }
+            vec![]
+        };
+
+        // spawn a delayed future to clean up job data on both Scheduler and Executors
+        let state = self.state.clone();
+        let job_id_str = job_id.to_owned();
+        let active_job_cache = self.active_job_cache.clone();
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
+            Self::clean_up_job_data(state, active_job_cache, true, job_id_str).await
+        });
+
+        Ok(tasks_to_cancel)
     }
 
     /// Mark a unscheduled job as failed. This will create a key under the FailedJobs keyspace
     /// and remove the job from ActiveJobs or QueuedJobs
-    /// TODO this should be atomic
     pub async fn fail_unscheduled_job(
         &self,
         job_id: &str,
         failure_reason: String,
+        clean_up_interval_in_secs: u64,
     ) -> Result<()> {
         debug!("Moving job {} from Active or Queue to Failed", job_id);
         let locks = self
@@ -360,7 +394,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
                 (Keyspace::FailedJobs, job_id),
             ])
             .await?;
-        with_locks(locks, self.fail_job_state(job_id, failure_reason)).await
+        with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
+
+        // spawn a delayed future to clean up job data on Scheduler
+        let state = self.state.clone();
+        let job_id_str = job_id.to_owned();
+        let active_job_cache = self.active_job_cache.clone();
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
+            Self::clean_up_job_data(state, active_job_cache, true, job_id_str).await
+        });
+
+        Ok(())
     }
 
     async fn fail_job_state(&self, job_id: &str, failure_reason: String) -> Result<()> {
@@ -389,7 +434,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
             };
             result
         } else {
-            warn!("Fail to find job {} in the cache", job_id);
+            info!("Fail to find job {} in the cache", job_id);
             let status = JobStatus {
                 status: Some(job_status::Status::Failed(FailedJob {
                     error: failure_reason.clone(),
@@ -402,6 +447,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
 
         Ok(())
     }
+
     pub async fn update_job(&self, job_id: &str) -> Result<()> {
         debug!("Update job {} in Active", job_id);
         if let Some(graph) = self.get_active_execution_graph(job_id).await {
@@ -598,6 +644,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
             .take(7)
             .collect()
     }
+
+    async fn clean_up_job_data(
+        state: Arc<dyn StateBackendClient>,
+        active_job_cache: ExecutionGraphCache,
+        failed: bool,
+        job_id: String,
+    ) -> Result<()> {
+        active_job_cache.remove(&job_id);
+        let keyspace = if failed {
+            Keyspace::FailedJobs
+        } else {
+            Keyspace::CompletedJobs
+        };
+
+        let lock = state.lock(keyspace.clone(), "").await?;
+        with_lock(lock, state.delete(keyspace, &job_id)).await?;
+
+        Ok(())
+    }
 }
 
 pub struct JobOverview {