You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/11/01 01:16:50 UTC

[arrow-ballista] branch master updated: Add some validation for remove_job_data in the executor server (#468)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 68d513da Add some validation for remove_job_data in the executor server (#468)
68d513da is described below

commit 68d513da6a99b867d21bf72d2d8bfb628e917512
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Tue Nov 1 09:16:45 2022 +0800

    Add some validation for remove_job_data in the executor server (#468)
    
    * Add some validation for remove_job_data in the executor server
    
    * Fix for comments of subdirectory check
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/executor/src/executor_server.rs | 93 +++++++++++++++++++++++++++++---
 1 file changed, 86 insertions(+), 7 deletions(-)

diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index ad3871fa..c91bfd43 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -19,7 +19,7 @@ use ballista_core::BALLISTA_VERSION;
 use std::collections::HashMap;
 use std::convert::TryInto;
 use std::ops::Deref;
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
@@ -720,13 +720,92 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
         request: Request<RemoveJobDataParams>,
     ) -> Result<Response<RemoveJobDataResult>, Status> {
         let job_id = request.into_inner().job_id;
-        let work_dir = self.executor.work_dir.clone();
-        let mut path = PathBuf::from(work_dir);
-        path.push(job_id.clone());
-        if path.is_dir() {
-            info!("Remove data for job {:?}", job_id);
-            std::fs::remove_dir_all(&path)?;
+        info!("Remove data for job {:?}", job_id);
+
+        let work_dir = PathBuf::from(&self.executor.work_dir);
+        let mut path = work_dir.clone();
+        path.push(&job_id);
+
+        // Verify it's an existing directory
+        if !path.is_dir() {
+            return if !path.exists() {
+                Err(Status::invalid_argument(format!(
+                    "Path {:?} does not exist!!!",
+                    path
+                )))
+            } else {
+                Err(Status::invalid_argument(format!(
+                    "Path {:?} is not for a directory!!!",
+                    path
+                )))
+            };
         }
+
+        if !is_subdirectory(path.as_path(), work_dir.as_path()) {
+            return Err(Status::invalid_argument(format!(
+                "Path {:?} is not a subdirectory of {:?}!!!",
+                path, work_dir
+            )));
+        }
+
+        std::fs::remove_dir_all(&path)?;
+
         Ok(Response::new(RemoveJobDataResult {}))
     }
 }
+
+// Check whether the path is the subdirectory of the base directory
+fn is_subdirectory(path: &Path, base_path: &Path) -> bool {
+    if let (Ok(path), Ok(base_path)) = (path.canonicalize(), base_path.canonicalize()) {
+        if let Some(parent_path) = path.parent() {
+            parent_path.starts_with(base_path)
+        } else {
+            false
+        }
+    } else {
+        false
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::executor_server::is_subdirectory;
+    use std::fs;
+    use std::path::{Path, PathBuf};
+    use tempfile::TempDir;
+
+    #[tokio::test]
+    async fn test_is_subdirectory() {
+        let base_dir = TempDir::new().unwrap().into_path();
+
+        // Normal correct one
+        {
+            let job_path = prepare_testing_job_directory(&base_dir, "job_a");
+            assert!(is_subdirectory(&job_path, base_dir.as_path()));
+        }
+
+        // Empty job id
+        {
+            let job_path = prepare_testing_job_directory(&base_dir, "");
+            assert!(!is_subdirectory(&job_path, base_dir.as_path()));
+
+            let job_path = prepare_testing_job_directory(&base_dir, ".");
+            assert!(!is_subdirectory(&job_path, base_dir.as_path()));
+        }
+
+        // Malicious job id
+        {
+            let job_path = prepare_testing_job_directory(&base_dir, "..");
+            assert!(!is_subdirectory(&job_path, base_dir.as_path()));
+        }
+    }
+
+    fn prepare_testing_job_directory(base_dir: &Path, job_id: &str) -> PathBuf {
+        let mut path = base_dir.to_path_buf();
+        path.push(job_id);
+        if !path.exists() {
+            fs::create_dir(&path).unwrap();
+        }
+        path
+    }
+}