You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/09 05:38:43 UTC

[GitHub] [arrow-datafusion] realno commented on a change in pull request #1783: Enable periodic cleanup of work_dir directories in ballista executor

realno commented on a change in pull request #1783:
URL: https://github.com/apache/arrow-datafusion/pull/1783#discussion_r802280125



##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {

Review comment:
       Should we avoid panic here?

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(
+    mut vec: Vec<ReadDir>,
+    seconds: i64,
+) -> Result<bool> {
+    let cutoff = Utc::now() - Duration::seconds(seconds);
+
+    while !vec.is_empty() {
+        let mut dir = vec.pop().unwrap();
+        while let Some(child) = dir.next_entry().await? {
+            let meta = child.metadata().await?;
+            match meta.is_dir() {
+                true => {
+                    let dir = fs::read_dir(child.path()).await?;
+                    vec.push(dir);

Review comment:
       I think that may be potential race condition - after dir is pushed there are new write operation.

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(

Review comment:
       Is it possible that during a long running job all the files are over the threshold but the job is still not finished?

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -112,6 +116,21 @@ async fn main() -> Result<()> {
         .context("Could not connect to scheduler")?;
 
     let scheduler_policy = opt.task_scheduling_policy;
+    let cleanup_ttl = opt.executor_cleanup_ttl;
+
+    if opt.executor_cleanup_enable {
+        let interval = opt.executor_cleanup_interval;
+        let mut interval_time = time::interval(Core_Duration::from_secs(interval));
+        tokio::spawn(async move {
+            loop {
+                interval_time.tick().await;
+                clean_shuffle_data_loop(&work_dir, cleanup_ttl)
+                    .await
+                    .unwrap();

Review comment:
       I am thinking if we should handle error here, or the cleanup thread may be killed if panic.

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(
+    mut vec: Vec<ReadDir>,
+    seconds: i64,
+) -> Result<bool> {
+    let cutoff = Utc::now() - Duration::seconds(seconds);
+
+    while !vec.is_empty() {
+        let mut dir = vec.pop().unwrap();
+        while let Some(child) = dir.next_entry().await? {
+            let meta = child.metadata().await?;
+            match meta.is_dir() {
+                true => {
+                    let dir = fs::read_dir(child.path()).await?;
+                    vec.push(dir);
+                }
+                false => {
+                    let modified_time: DateTime<Utc> =
+                        meta.modified().map(chrono::DateTime::from).unwrap();

Review comment:
       Should we allow panic?

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(
+    mut vec: Vec<ReadDir>,
+    seconds: i64,
+) -> Result<bool> {
+    let cutoff = Utc::now() - Duration::seconds(seconds);
+
+    while !vec.is_empty() {
+        let mut dir = vec.pop().unwrap();

Review comment:
       Should we allow panic?

##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(

Review comment:
       If it is possible, maybe we can also add a flag in the job folder to indicate job status. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org