You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/11/03 16:59:55 UTC

[arrow-ballista] branch master updated: Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 642ab6b1 Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)
642ab6b1 is described below

commit 642ab6b1ff7ec83b64bf5ec3c2594a320a7a1772
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Fri Nov 4 00:59:49 2022 +0800

    Remove executor config executor_cleanup_enable and make the configuation name for executor cleanup more intuitive (#489)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/executor/executor_config_spec.toml |  14 +---
 ballista/executor/src/main.rs               | 116 ++++++++++++++++------------
 2 files changed, 70 insertions(+), 60 deletions(-)

diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml
index 06831b25..fe91f0c5 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -86,19 +86,13 @@ doc = "The task scheduing policy for the scheduler, possible values: pull-staged
 default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
 
 [[param]]
-name = "executor_cleanup_enable"
-type = "bool"
-doc = "Enable periodic cleanup of work_dir directories."
-default = "false"
-
-[[param]]
-name = "executor_cleanup_interval"
+name = "job_data_clean_up_interval_seconds"
 type = "u64"
-doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine."
-default = "1800"
+doc = "Controls the interval in seconds, which the worker cleans up old job dirs on the local machine. 0 means the clean up is disabled"
+default = "0"
 
 [[param]]
-name = "executor_cleanup_ttl"
+name = "job_data_ttl_seconds"
 type = "u64"
 doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained"
 default = "604800"
diff --git a/ballista/executor/src/main.rs b/ballista/executor/src/main.rs
index 6d737ab3..6d875457 100644
--- a/ballista/executor/src/main.rs
+++ b/ballista/executor/src/main.rs
@@ -17,10 +17,9 @@
 
 //! Ballista Rust executor binary.
 
-use chrono::{DateTime, Duration, Utc};
 use std::net::SocketAddr;
 use std::sync::Arc;
-use std::time::{Duration as Core_Duration, Instant};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 use std::{env, io};
 
 use anyhow::{Context, Result};
@@ -28,7 +27,7 @@ use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_executor::{execution_loop, executor_server};
 use log::{error, info, warn};
 use tempfile::TempDir;
-use tokio::fs::ReadDir;
+use tokio::fs::DirEntry;
 use tokio::signal;
 use tokio::{fs, time};
 use uuid::Uuid;
@@ -246,14 +245,14 @@ async fn main() -> Result<()> {
         BallistaCodec::default();
 
     let scheduler_policy = opt.task_scheduling_policy;
-    let cleanup_ttl = opt.executor_cleanup_ttl;
+    let job_data_ttl_seconds = opt.job_data_ttl_seconds;
 
     // Graceful shutdown notification
     let shutdown_noti = ShutdownNotifier::new();
 
-    if opt.executor_cleanup_enable {
+    if opt.job_data_clean_up_interval_seconds > 0 {
         let mut interval_time =
-            time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+            time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
         let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
         let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
         tokio::spawn(async move {
@@ -261,7 +260,7 @@ async fn main() -> Result<()> {
             while !shuffle_cleaner_shutdown.is_shutdown() {
                 tokio::select! {
                     _ = interval_time.tick() => {
-                            if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
+                            if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
                         {
                             error!("Ballista executor fail to clean_shuffle_data {:?}", e)
                         }
@@ -419,40 +418,41 @@ async fn check_services(
     }
 }
 
-/// This function will scheduled periodically for cleanup executor.
-/// Will only clean the dir under work_dir not include file
-async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+/// This function will be scheduled periodically for cleanup the job shuffle data left on the executor.
+/// Only directories will be checked cleaned.
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
     let mut dir = fs::read_dir(work_dir).await?;
     let mut to_deleted = Vec::new();
-    let mut need_delete_dir;
     while let Some(child) = dir.next_entry().await? {
         if let Ok(metadata) = child.metadata().await {
+            let child_path = child.path().into_os_string();
             // only delete the job dir
             if metadata.is_dir() {
-                let dir = fs::read_dir(child.path()).await?;
-                match check_modified_time_in_dirs(vec![dir], seconds).await {
-                    Ok(x) => match x {
-                        true => {
-                            need_delete_dir = child.path().into_os_string();
-                            to_deleted.push(need_delete_dir)
-                        }
-                        false => {}
-                    },
+                match satisfy_dir_ttl(child, seconds).await {
                     Err(e) => {
-                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                        error!(
+                            "Fail to check ttl for the directory {:?} due to {:?}",
+                            child_path, e
+                        )
                     }
+                    Ok(false) => to_deleted.push(child_path),
+                    Ok(_) => {}
                 }
+            } else {
+                warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
             }
         } else {
-            error!("Can not get metadata from file: {:?}", child)
+            error!("Fail to get metadata for file {:?}", child.path())
         }
     }
     info!(
-        "The work_dir {:?} that have not been modified for {:?} seconds will be deleted",
-        &to_deleted, seconds
+        "The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
+        to_deleted, seconds
     );
     for del in to_deleted {
-        fs::remove_dir_all(del).await?;
+        if let Err(e) = fs::remove_dir_all(&del).await {
+            error!("Fail to remove the directory {:?} due to {}", del, e);
+        }
     }
     Ok(())
 }
@@ -474,37 +474,53 @@ async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
 
     info!("The work_dir {:?} will be deleted", &to_deleted);
     for del in to_deleted {
-        fs::remove_dir_all(del).await?;
+        if let Err(e) = fs::remove_dir_all(&del).await {
+            error!("Fail to remove the directory {:?} due to {}", del, e);
+        }
     }
     Ok(())
 }
 
-/// Determines if a directory all files are older than cutoff seconds.
-async fn check_modified_time_in_dirs(
-    mut vec: Vec<ReadDir>,
-    ttl_seconds: i64,
-) -> Result<bool> {
-    let cutoff = Utc::now() - Duration::seconds(ttl_seconds);
-
-    while !vec.is_empty() {
-        let mut dir = vec.pop().unwrap();
-        while let Some(child) = dir.next_entry().await? {
-            let meta = child.metadata().await?;
-            if meta.is_dir() {
-                let dir = fs::read_dir(child.path()).await?;
-                // check in next loop
-                vec.push(dir);
-            } else {
-                let modified_time: DateTime<Utc> =
-                    meta.modified().map(chrono::DateTime::from)?;
-                if modified_time > cutoff {
-                    // if one file has been modified in ttl we won't delete the whole dir
-                    return Ok(false);
-                }
-            }
+/// Determines if a directory contains files newer than the cutoff time.
+/// If return true, it means the directory contains files newer than the cutoff time. It satisfy the ttl and should not be deleted.
+pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
+    let cutoff = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards")
+        .checked_sub(Duration::from_secs(ttl_seconds))
+        .expect("The cut off time went backwards");
+
+    let mut to_check = vec![dir];
+    while let Some(dir) = to_check.pop() {
+        // Check the ttl for the current directory first
+        if dir
+            .metadata()
+            .await?
+            .modified()?
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards")
+            > cutoff
+        {
+            return Ok(true);
+        }
+        // Check its children
+        let mut children = fs::read_dir(dir.path()).await?;
+        while let Some(child) = children.next_entry().await? {
+            let metadata = child.metadata().await?;
+            if metadata.is_dir() {
+                to_check.push(child);
+            } else if metadata
+                .modified()?
+                .duration_since(UNIX_EPOCH)
+                .expect("Time went backwards")
+                > cutoff
+            {
+                return Ok(true);
+            };
         }
     }
-    Ok(true)
+
+    Ok(false)
 }
 
 #[cfg(test)]