You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/12 17:48:58 UTC
[arrow-ballista] branch master updated: Clean up job data on both Scheduler and Executor (#188)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new e42a6c9c Clean up job data on both Scheduler and Executor (#188)
e42a6c9c is described below
commit e42a6c9c2de4530adb22bd37a9bea17d60f2582f
Author: mingmwang <mi...@ebay.com>
AuthorDate: Thu Oct 13 01:48:53 2022 +0800
Clean up job data on both Scheduler and Executor (#188)
---
ballista/rust/core/proto/ballista.proto | 9 +++
ballista/rust/executor/src/executor_server.rs | 19 ++++-
.../rust/scheduler/src/scheduler_server/event.rs | 1 +
.../rust/scheduler/src/scheduler_server/grpc.rs | 23 +++++-
.../src/scheduler_server/query_stage_scheduler.rs | 48 +++++++++++-
ballista/rust/scheduler/src/state/backend/mod.rs | 2 +-
.../rust/scheduler/src/state/executor_manager.rs | 30 +++++++-
ballista/rust/scheduler/src/state/mod.rs | 2 +-
ballista/rust/scheduler/src/state/task_manager.rs | 85 +++++++++++++++++++---
9 files changed, 198 insertions(+), 21 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 9cb8cca4..67d0be57 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -975,6 +975,13 @@ message CancelTasksResult {
bool cancelled = 1;
}
+message RemoveJobDataParams {
+ string job_id = 1;
+}
+
+message RemoveJobDataResult {
+}
+
message RunningTaskInfo {
uint32 task_id = 1;
string job_id = 2;
@@ -1014,4 +1021,6 @@ service ExecutorGrpc {
rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {}
rpc CancelTasks (CancelTasksParams) returns (CancelTasksResult) {}
+
+ rpc RemoveJobData (RemoveJobDataParams) returns (RemoveJobDataResult) {}
}
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index e850c895..acf5c873 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -19,6 +19,7 @@ use ballista_core::BALLISTA_VERSION;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
@@ -37,7 +38,8 @@ use ballista_core::serde::protobuf::{
executor_metric, executor_status, CancelTasksParams, CancelTasksResult,
ExecutorMetric, ExecutorStatus, HeartBeatParams, LaunchMultiTaskParams,
LaunchMultiTaskResult, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
- StopExecutorParams, StopExecutorResult, TaskStatus, UpdateTaskStatusParams,
+ RemoveJobDataParams, RemoveJobDataResult, StopExecutorParams, StopExecutorResult,
+ TaskStatus, UpdateTaskStatusParams,
};
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::serde::scheduler::TaskDefinition;
@@ -710,4 +712,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
Ok(Response::new(CancelTasksResult { cancelled }))
}
+
+ async fn remove_job_data(
+ &self,
+ request: Request<RemoveJobDataParams>,
+ ) -> Result<Response<RemoveJobDataResult>, Status> {
+ let job_id = request.into_inner().job_id;
+ let work_dir = self.executor.work_dir.clone();
+ let mut path = PathBuf::from(work_dir);
+ path.push(job_id.clone());
+ if path.is_dir() {
+ info!("Remove data for job {:?}", job_id);
+ std::fs::remove_dir_all(&path)?;
+ }
+ Ok(Response::new(RemoveJobDataResult {}))
+ }
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs b/ballista/rust/scheduler/src/scheduler_server/event.rs
index d01c0516..f206594f 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -39,6 +39,7 @@ pub enum QueryStageSchedulerEvent {
// For a job fails with its execution graph setting failed
JobRunningFailed(String, String),
JobUpdated(String),
+ JobCancel(String),
TaskUpdating(String, Vec<TaskStatus>),
ReservationOffering(Vec<ExecutorReservation>),
ExecutorLost(String, Option<String>),
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9393ed78..ddd212a1 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -42,6 +42,8 @@ use log::{debug, error, info, warn};
use std::ops::Deref;
use std::sync::Arc;
+
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
@@ -529,10 +531,23 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
request: Request<CancelJobParams>,
) -> Result<Response<CancelJobResult>, Status> {
let job_id = request.into_inner().job_id;
- match self.state.cancel_job(&job_id).await {
- Ok(cancelled) => Ok(Response::new(CancelJobResult { cancelled })),
- Err(e) => Err(Status::internal(e.to_string())),
- }
+ info!("Received cancellation request for job {}", job_id);
+
+ self.query_stage_event_loop
+ .get_sender()
+ .map_err(|e| {
+ let msg = format!("Get query stage event loop error due to {:?}", e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?
+ .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
+ .await
+ .map_err(|e| {
+ let msg = format!("Post to query stage event loop error due to {:?}", e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
+ Ok(Response::new(CancelJobResult { cancelled: true }))
}
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 96de88a8..4f7d12a1 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -16,6 +16,7 @@
// under the License.
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
use log::{debug, error, info};
@@ -33,6 +34,10 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
+// TODO move to configuration file
+/// Clean up job data interval
+pub const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;
+
pub(crate) struct QueryStageScheduler<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
@@ -131,30 +136,67 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
error!("Job {} failed: {}", job_id, failure_reason);
self.state
.task_manager
- .fail_unscheduled_job(&job_id, failure_reason)
+ .fail_unscheduled_job(
+ &job_id,
+ failure_reason,
+ CLEANUP_FINISHED_JOB_DELAY_SECS,
+ )
.await?;
}
QueryStageSchedulerEvent::JobFinished(job_id) => {
info!("Job {} success", job_id);
- self.state.task_manager.succeed_job(&job_id).await?;
+ self.state
+ .task_manager
+ .succeed_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
+ .await?;
+ let executor_manager = self.state.executor_manager.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(
+ CLEANUP_FINISHED_JOB_DELAY_SECS,
+ ))
+ .await;
+ executor_manager.clean_up_executors_data(job_id).await;
+ });
}
QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason) => {
error!("Job {} running failed", job_id);
let tasks = self
.state
.task_manager
- .abort_job(&job_id, failure_reason)
+ .abort_job(&job_id, failure_reason, CLEANUP_FINISHED_JOB_DELAY_SECS)
.await?;
if !tasks.is_empty() {
tx_event
.post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
.await?;
}
+ let executor_manager = self.state.executor_manager.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(
+ CLEANUP_FINISHED_JOB_DELAY_SECS,
+ ))
+ .await;
+ executor_manager.clean_up_executors_data(job_id).await;
+ });
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
info!("Job {} Updated", job_id);
self.state.task_manager.update_job(&job_id).await?;
}
+ QueryStageSchedulerEvent::JobCancel(job_id) => {
+ self.state
+ .task_manager
+ .cancel_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
+ .await?;
+ let executor_manager = self.state.executor_manager.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(
+ CLEANUP_FINISHED_JOB_DELAY_SECS,
+ ))
+ .await;
+ executor_manager.clean_up_executors_data(job_id).await;
+ });
+ }
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
let num_status = tasks_status.len();
match self
diff --git a/ballista/rust/scheduler/src/state/backend/mod.rs b/ballista/rust/scheduler/src/state/backend/mod.rs
index 85c0f34d..cbba3ed1 100644
--- a/ballista/rust/scheduler/src/state/backend/mod.rs
+++ b/ballista/rust/scheduler/src/state/backend/mod.rs
@@ -49,7 +49,7 @@ impl parse_arg::ParseArgFromStr for StateBackend {
}
}
-#[derive(Debug, Eq, PartialEq, Hash)]
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum Keyspace {
Executors,
ActiveJobs,
diff --git a/ballista/rust/scheduler/src/state/executor_manager.rs b/ballista/rust/scheduler/src/state/executor_manager.rs
index 322e5a7b..70fc7a0a 100644
--- a/ballista/rust/scheduler/src/state/executor_manager.rs
+++ b/ballista/rust/scheduler/src/state/executor_manager.rs
@@ -27,12 +27,13 @@ use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
executor_status, CancelTasksParams, ExecutorHeartbeat, ExecutorStatus,
+ RemoveJobDataParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::utils::create_grpc_client_connection;
use dashmap::{DashMap, DashSet};
use futures::StreamExt;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tonic::transport::Channel;
@@ -258,6 +259,33 @@ impl ExecutorManager {
Ok(())
}
+ /// Send rpc to Executors to clean up the job data
+ pub async fn clean_up_executors_data(&self, job_id: String) {
+ let alive_executors = self.get_alive_executors_within_one_minute();
+ for executor in alive_executors {
+ let job_id_clone = job_id.to_owned();
+ if let Ok(mut client) = self.get_client(&executor).await {
+ tokio::spawn(async move {
+ {
+ if let Err(err) = client
+ .remove_job_data(RemoveJobDataParams {
+ job_id: job_id_clone,
+ })
+ .await
+ {
+ warn!(
+ "Failed to call remove_job_data on Executor {} due to {:?}",
+ executor, err
+ )
+ }
+ }
+ });
+ } else {
+ warn!("Failed to get client for Executor {}", executor)
+ }
+ }
+ }
+
pub async fn get_client(
&self,
executor_id: &str,
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index 8cdd74d6..57fdc3c0 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -276,7 +276,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
info!("Received cancellation request for job {}", job_id);
- match self.task_manager.cancel_job(job_id).await {
+ match self.task_manager.cancel_job(job_id, 300).await {
Ok(tasks) => {
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
let msg = format!("Error to cancel running tasks when cancelling job {} due to {:?}", job_id, e);
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs
index acafbcd8..ddd5ba9c 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -44,6 +44,7 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
+use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
type ExecutionGraphCache = Arc<DashMap<String, Arc<RwLock<ExecutionGraph>>>>;
@@ -287,7 +288,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
/// Mark a job to success. This will create a key under the CompletedJobs keyspace
/// and remove the job from ActiveJobs
- pub async fn succeed_job(&self, job_id: &str) -> Result<()> {
+ pub(crate) async fn succeed_job(
+ &self,
+ job_id: &str,
+ clean_up_interval: u64,
+ ) -> Result<()> {
debug!("Moving job {} from Active to Success", job_id);
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
with_lock(lock, self.state.delete(Keyspace::ActiveJobs, job_id)).await?;
@@ -301,17 +306,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
.await?;
} else {
error!("Job {} has not finished and cannot be completed", job_id);
+ return Ok(());
}
} else {
warn!("Fail to find job {} in the cache", job_id);
}
+ // spawn a delayed future to clean up job data on both Scheduler and Executors
+ let state = self.state.clone();
+ let job_id_str = job_id.to_owned();
+ let active_job_cache = self.active_job_cache.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
+ Self::clean_up_job_data(state, active_job_cache, false, job_id_str).await
+ });
+
Ok(())
}
/// Cancel the job and return a Vec of running tasks need to cancel
- pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<Vec<RunningTaskInfo>> {
- self.abort_job(job_id, "Cancelled".to_owned()).await
+ pub(crate) async fn cancel_job(
+ &self,
+ job_id: &str,
+ clean_up_interval_in_secs: u64,
+ ) -> Result<Vec<RunningTaskInfo>> {
+ self.abort_job(job_id, "Cancelled".to_owned(), clean_up_interval_in_secs)
+ .await
}
/// Abort the job and return a Vec of running tasks need to cancel
@@ -319,6 +339,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
&self,
job_id: &str,
failure_reason: String,
+ clean_up_interval_in_secs: u64,
) -> Result<Vec<RunningTaskInfo>> {
let locks = self
.state
@@ -327,7 +348,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
(Keyspace::FailedJobs, job_id),
])
.await?;
- if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let tasks_to_cancel = if let Some(graph) =
+ self.get_active_execution_graph(job_id).await
+ {
let running_tasks = graph.read().await.running_tasks();
info!(
"Cancelling {} running tasks for job {}",
@@ -335,22 +358,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
job_id
);
with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
- Ok(running_tasks)
+ running_tasks
} else {
// TODO listen the job state update event and fix task cancelling
warn!("Fail to find job {} in the cache, unable to cancel tasks for job, fail the job state only.", job_id);
with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
- Ok(vec![])
- }
+ vec![]
+ };
+
+ // spawn a delayed future to clean up job data on both Scheduler and Executors
+ let state = self.state.clone();
+ let job_id_str = job_id.to_owned();
+ let active_job_cache = self.active_job_cache.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
+ Self::clean_up_job_data(state, active_job_cache, true, job_id_str).await
+ });
+
+ Ok(tasks_to_cancel)
}
/// Mark a unscheduled job as failed. This will create a key under the FailedJobs keyspace
/// and remove the job from ActiveJobs or QueuedJobs
- /// TODO this should be atomic
pub async fn fail_unscheduled_job(
&self,
job_id: &str,
failure_reason: String,
+ clean_up_interval_in_secs: u64,
) -> Result<()> {
debug!("Moving job {} from Active or Queue to Failed", job_id);
let locks = self
@@ -360,7 +394,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
(Keyspace::FailedJobs, job_id),
])
.await?;
- with_locks(locks, self.fail_job_state(job_id, failure_reason)).await
+ with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
+
+ // spawn a delayed future to clean up job data on Scheduler
+ let state = self.state.clone();
+ let job_id_str = job_id.to_owned();
+ let active_job_cache = self.active_job_cache.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
+ Self::clean_up_job_data(state, active_job_cache, true, job_id_str).await
+ });
+
+ Ok(())
}
async fn fail_job_state(&self, job_id: &str, failure_reason: String) -> Result<()> {
@@ -389,7 +434,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
};
result
} else {
- warn!("Fail to find job {} in the cache", job_id);
+ info!("Fail to find job {} in the cache", job_id);
let status = JobStatus {
status: Some(job_status::Status::Failed(FailedJob {
error: failure_reason.clone(),
@@ -402,6 +447,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
Ok(())
}
+
pub async fn update_job(&self, job_id: &str) -> Result<()> {
debug!("Update job {} in Active", job_id);
if let Some(graph) = self.get_active_execution_graph(job_id).await {
@@ -598,6 +644,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
.take(7)
.collect()
}
+
+ async fn clean_up_job_data(
+ state: Arc<dyn StateBackendClient>,
+ active_job_cache: ExecutionGraphCache,
+ failed: bool,
+ job_id: String,
+ ) -> Result<()> {
+ active_job_cache.remove(&job_id);
+ let keyspace = if failed {
+ Keyspace::FailedJobs
+ } else {
+ Keyspace::CompletedJobs
+ };
+
+ let lock = state.lock(keyspace.clone(), "").await?;
+ with_lock(lock, state.delete(keyspace, &job_id)).await?;
+
+ Ok(())
+ }
}
pub struct JobOverview {