You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/11/03 16:59:55 UTC
[arrow-ballista] branch master updated: Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 642ab6b1 Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)
642ab6b1 is described below
commit 642ab6b1ff7ec83b64bf5ec3c2594a320a7a1772
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Nov 4 00:59:49 2022 +0800
Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/executor/executor_config_spec.toml | 14 +---
ballista/executor/src/main.rs | 116 ++++++++++++++++------------
2 files changed, 70 insertions(+), 60 deletions(-)
diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml
index 06831b25..fe91f0c5 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -86,19 +86,13 @@ doc = "The task scheduing policy for the scheduler, possible values: pull-staged
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
[[param]]
-name = "executor_cleanup_enable"
-type = "bool"
-doc = "Enable periodic cleanup of work_dir directories."
-default = "false"
-
-[[param]]
-name = "executor_cleanup_interval"
+name = "job_data_clean_up_interval_seconds"
type = "u64"
-doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine."
-default = "1800"
+doc = "Controls the interval in seconds, which the worker cleans up old job dirs on the local machine. 0 means the clean up is disabled"
+default = "0"
[[param]]
-name = "executor_cleanup_ttl"
+name = "job_data_ttl_seconds"
type = "u64"
doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained"
default = "604800"
diff --git a/ballista/executor/src/main.rs b/ballista/executor/src/main.rs
index 6d737ab3..6d875457 100644
--- a/ballista/executor/src/main.rs
+++ b/ballista/executor/src/main.rs
@@ -17,10 +17,9 @@
//! Ballista Rust executor binary.
-use chrono::{DateTime, Duration, Utc};
use std::net::SocketAddr;
use std::sync::Arc;
-use std::time::{Duration as Core_Duration, Instant};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{env, io};
use anyhow::{Context, Result};
@@ -28,7 +27,7 @@ use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_executor::{execution_loop, executor_server};
use log::{error, info, warn};
use tempfile::TempDir;
-use tokio::fs::ReadDir;
+use tokio::fs::DirEntry;
use tokio::signal;
use tokio::{fs, time};
use uuid::Uuid;
@@ -246,14 +245,14 @@ async fn main() -> Result<()> {
BallistaCodec::default();
let scheduler_policy = opt.task_scheduling_policy;
- let cleanup_ttl = opt.executor_cleanup_ttl;
+ let job_data_ttl_seconds = opt.job_data_ttl_seconds;
// Graceful shutdown notification
let shutdown_noti = ShutdownNotifier::new();
- if opt.executor_cleanup_enable {
+ if opt.job_data_clean_up_interval_seconds > 0 {
let mut interval_time =
- time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+ time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
@@ -261,7 +260,7 @@ async fn main() -> Result<()> {
while !shuffle_cleaner_shutdown.is_shutdown() {
tokio::select! {
_ = interval_time.tick() => {
- if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
+ if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
{
error!("Ballista executor fail to clean_shuffle_data {:?}", e)
}
@@ -419,40 +418,41 @@ async fn check_services(
}
}
-/// This function will scheduled periodically for cleanup executor.
-/// Will only clean the dir under work_dir not include file
-async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+/// This function will be scheduled periodically for cleanup the job shuffle data left on the executor.
+/// Only directories will be checked cleaned.
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
- let mut need_delete_dir;
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
+ let child_path = child.path().into_os_string();
// only delete the job dir
if metadata.is_dir() {
- let dir = fs::read_dir(child.path()).await?;
- match check_modified_time_in_dirs(vec![dir], seconds).await {
- Ok(x) => match x {
- true => {
- need_delete_dir = child.path().into_os_string();
- to_deleted.push(need_delete_dir)
- }
- false => {}
- },
+ match satisfy_dir_ttl(child, seconds).await {
Err(e) => {
- error!("Fail in clean_shuffle_data_loop {:?}", e)
+ error!(
+ "Fail to check ttl for the directory {:?} due to {:?}",
+ child_path, e
+ )
}
+ Ok(false) => to_deleted.push(child_path),
+ Ok(_) => {}
}
+ } else {
+ warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
}
} else {
- error!("Can not get metadata from file: {:?}", child)
+ error!("Fail to get metadata for file {:?}", child.path())
}
}
info!(
- "The work_dir {:?} that have not been modified for {:?} seconds will be deleted",
- &to_deleted, seconds
+ "The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
+ to_deleted, seconds
);
for del in to_deleted {
- fs::remove_dir_all(del).await?;
+ if let Err(e) = fs::remove_dir_all(&del).await {
+ error!("Fail to remove the directory {:?} due to {}", del, e);
+ }
}
Ok(())
}
@@ -474,37 +474,53 @@ async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
info!("The work_dir {:?} will be deleted", &to_deleted);
for del in to_deleted {
- fs::remove_dir_all(del).await?;
+ if let Err(e) = fs::remove_dir_all(&del).await {
+ error!("Fail to remove the directory {:?} due to {}", del, e);
+ }
}
Ok(())
}
-/// Determines if a directory all files are older than cutoff seconds.
-async fn check_modified_time_in_dirs(
- mut vec: Vec<ReadDir>,
- ttl_seconds: i64,
-) -> Result<bool> {
- let cutoff = Utc::now() - Duration::seconds(ttl_seconds);
-
- while !vec.is_empty() {
- let mut dir = vec.pop().unwrap();
- while let Some(child) = dir.next_entry().await? {
- let meta = child.metadata().await?;
- if meta.is_dir() {
- let dir = fs::read_dir(child.path()).await?;
- // check in next loop
- vec.push(dir);
- } else {
- let modified_time: DateTime<Utc> =
- meta.modified().map(chrono::DateTime::from)?;
- if modified_time > cutoff {
- // if one file has been modified in ttl we won't delete the whole dir
- return Ok(false);
- }
- }
+/// Determines if a directory contains files newer than the cutoff time.
+/// If return true, it means the directory contains files newer than the cutoff time. It satisfy the ttl and should not be deleted.
+pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
+ let cutoff = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ .checked_sub(Duration::from_secs(ttl_seconds))
+ .expect("The cut off time went backwards");
+
+ let mut to_check = vec![dir];
+ while let Some(dir) = to_check.pop() {
+ // Check the ttl for the current directory first
+ if dir
+ .metadata()
+ .await?
+ .modified()?
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ > cutoff
+ {
+ return Ok(true);
+ }
+ // Check its children
+ let mut children = fs::read_dir(dir.path()).await?;
+ while let Some(child) = children.next_entry().await? {
+ let metadata = child.metadata().await?;
+ if metadata.is_dir() {
+ to_check.push(child);
+ } else if metadata
+ .modified()?
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ > cutoff
+ {
+ return Ok(true);
+ };
}
}
- Ok(true)
+
+ Ok(false)
}
#[cfg(test)]