You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/07/14 03:42:43 UTC

[incubator-kyuubi] branch master updated: Refactor Kyuubi kubernetes block cleaner (#794)

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new bd42d8a  Refactor Kyuubi kubernetes block cleaner (#794)
bd42d8a is described below

commit bd42d8a2f8b46852bf9bfc30032600f8a680102a
Author: ulysses <ul...@gmail.com>
AuthorDate: Wed Jul 14 11:42:06 2021 +0800

    Refactor Kyuubi kubernetes block cleaner (#794)
    
    Refactor Kyuubi kubernetes block cleaner
---
 build/dist                                         |  10 +-
 pom.xml                                            |   8 +-
 .../scala/org/apache/kyuubi/tools/Constants.scala  |  26 ---
 .../kyuubi/tools/KubernetesSparkBlockCleaner.scala | 206 ---------------------
 .../kubernetes/docker}/Dockerfile                  |   3 +-
 .../kubernetes/docker}/entrypoint.sh               |   0
 .../kubernetes}/spark-block-cleaner.yml            |  14 +-
 tools/{kubernetes => }/spark-block-cleaner/pom.xml |  10 +-
 .../kyuubi/tools/KubernetesSparkBlockCleaner.scala | 206 +++++++++++++++++++++
 .../KubernetesSparkBlockCleanerSuite.scala         | 109 +++++++++++
 10 files changed, 344 insertions(+), 248 deletions(-)

diff --git a/build/dist b/build/dist
index 216914e..03d54ca 100755
--- a/build/dist
+++ b/build/dist
@@ -179,7 +179,6 @@ mkdir -p "$DISTDIR/pid"
 mkdir -p "$DISTDIR/logs"
 mkdir -p "$DISTDIR/work"
 mkdir -p "$DISTDIR/externals/engines/spark"
-mkdir -p "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
 echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
 echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
 echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
@@ -195,9 +194,12 @@ cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR
 cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine-$VERSION.jar" "$DISTDIR/externals/engines/spark"
 
 # Copy kyuubi tools
-cp -r "$KYUUBI_HOME/tools/kubernetes/docker/" "$DISTDIR/tools/kubernetes"
-cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/"
-cp "$KYUUBI_HOME/tools/kubernetes/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
+if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" ]]; then
+  mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes/docker"
+  mkdir -p "$DISTDIR/tools/spark-block-cleaner/jars"
+  cp -r "$KYUUBI_HOME/tools/spark-block-cleaner/kubernetes/" "$DISTDIR/tools/spark-block-cleaner/kubernetes/"
+  cp "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/spark-block-cleaner/jars/"
+fi
 
 # Copy Kyuubi extension
 SPARK_MID_VERSION=${SPARK_VERSION%.*}
diff --git a/pom.xml b/pom.xml
index 107e14a..823e9df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,6 @@
         <module>kyuubi-main</module>
         <module>kyuubi-metrics</module>
         <module>kyuubi-zookeeper</module>
-        <module>tools/kubernetes/spark-block-cleaner</module>
     </modules>
     <packaging>pom</packaging>
 
@@ -1694,5 +1693,12 @@
                 <module>dev/kyuubi-extension-spark_3.1</module>
             </modules>
         </profile>
+
+        <profile>
+            <id>spark-block-cleaner</id>
+            <modules>
+                <module>tools/spark-block-cleaner</module>
+            </modules>
+        </profile>
     </profiles>
 </project>
diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala
deleted file mode 100644
index 0432969..0000000
--- a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.tools
-
-object Constants {
-  val CACHE_DIRS_KEY = "CACHE_DIRS"
-  val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
-  val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
-  val SLEEP_TIME_KEY = "SLEEP_TIME"
-  val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
-}
diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
deleted file mode 100644
index 6420700..0000000
--- a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.tools
-
-import java.io.File
-import java.nio.file.{Files, Paths}
-import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
-
-import org.apache.kyuubi.Logging
-import org.apache.kyuubi.tools.Constants._
-/*
-* Spark storage shuffle data as the following structure.
-*
-* local-dir1/
-*   blockmgr-uuid/
-*     hash-sub-dir/
-*       shuffle-data
-*       shuffle-index
-*
-* local-dir2/
-*   blockmgr-uuid/
-*     hash-sub-dir/
-*       shuffle-data
-*       shuffle-index
-*
-* ...
-*/
-object KubernetesSparkBlockCleaner extends Logging {
-  private val envMap = System.getenv()
-
-  private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
-    "60").toInt
-  private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
-    "604800000").toLong
-  private val sleepTime = envMap.getOrDefault(SLEEP_TIME_KEY,
-    "3600000").toLong
-  private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
-    "432000000").toLong
-  private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
-    envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
-  } else {
-    throw new IllegalArgumentException(s"the env ${CACHE_DIRS_KEY} can not be null")
-  }
-
-  def doClean(dir: File, time: Long) {
-    info(s"start clean ${dir.getName} with fileExpiredTime ${time}")
-
-    // clean blockManager shuffle file
-    dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
-      .foreach(blockManagerDir => {
-
-        info(s"start check blockManager dir ${blockManagerDir.getName}")
-        // check blockManager directory
-        blockManagerDir.listFiles.filter(_.isDirectory).foreach(subDir => {
-
-          info(s"start check sub dir ${subDir.getName}")
-          // check sub directory
-          subDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
-          // delete empty sub directory
-          checkAndDeleteDir(subDir)
-        })
-        // delete empty blockManager directory
-        checkAndDeleteDir(blockManagerDir)
-      })
-
-    // clean spark cache file
-    dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
-      .foreach(cacheDir => {
-        info(s"start check cache dir ${cacheDir.getName}")
-        cacheDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
-        // delete empty spark cache file
-        checkAndDeleteDir(cacheDir)
-      })
-  }
-
-  def checkAndDeleteFIle(file: File, time: Long): Unit = {
-    info(s"check file ${file.getName}")
-    if (System.currentTimeMillis() - file.lastModified() > time) {
-      if (file.delete()) {
-        info(s"delete file ${file.getName} success")
-      } else {
-        warn(s"delete file ${file.getName} fail")
-      }
-    }
-  }
-
-  def checkAndDeleteDir(dir: File): Unit = {
-    if (dir.listFiles.isEmpty) {
-      if (dir.delete()) {
-        info(s"delete dir ${dir.getName} success")
-      } else {
-        warn(s"delete dir ${dir.getName} fail")
-      }
-    }
-  }
-
-  import scala.sys.process._
-
-  def checkUsedCapacity(dir: String): Boolean = {
-    val used = (s"df ${dir}" #| s"grep ${dir}").!!
-      .split(" ").filter(_.endsWith("%")) {
-      0
-    }.replace("%", "")
-    info(s"${dir} now used ${used}% space")
-
-    used.toInt > (100 - freeSpaceThreshold)
-  }
-
-  def initializeConfiguration(): Unit = {
-    if (fileExpiredTime < 0) {
-      throw new IllegalArgumentException(s"the env ${FILE_EXPIRED_TIME_KEY} " +
-        s"should be greater than 0")
-    }
-
-    if (deepCleanFileExpiredTime < 0) {
-      throw new IllegalArgumentException(s"the env ${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY} " +
-        s"should be greater than 0")
-    }
-
-    if (sleepTime < 0) {
-      throw new IllegalArgumentException(s"the env ${SLEEP_TIME_KEY} " +
-        s"should be greater than 0")
-    }
-
-    if (freeSpaceThreshold < 0 || freeSpaceThreshold > 100) {
-      throw new IllegalArgumentException(s"the env ${FREE_SPACE_THRESHOLD_KEY} " +
-        s"should between 0 and 100")
-    }
-
-    info(s"finish initializing configuration, " +
-      s"use ${CACHE_DIRS_KEY}: ${cacheDirs.mkString(",")},  " +
-      s"${FILE_EXPIRED_TIME_KEY}: ${fileExpiredTime},  " +
-      s"${FREE_SPACE_THRESHOLD_KEY}: ${freeSpaceThreshold}, " +
-      s"${SLEEP_TIME_KEY}: ${sleepTime}, " +
-      s"${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY}: ${deepCleanFileExpiredTime}")
-  }
-
-  def initializeThreadPool(): ThreadPoolExecutor = {
-    new ThreadPoolExecutor(cacheDirs.length,
-      cacheDirs.length * 2,
-      0,
-      TimeUnit.SECONDS,
-      new LinkedBlockingQueue[Runnable]())
-  }
-
-  def main(args: Array[String]): Unit = {
-    initializeConfiguration()
-    val threadPool = initializeThreadPool()
-    try {
-      while (true) {
-        info("start clean job")
-        cacheDirs.foreach(pathStr => {
-          val path = Paths.get(pathStr)
-
-          if (!Files.exists(path)) {
-            throw new IllegalArgumentException(s"this path ${pathStr} does not exists")
-          }
-
-          if (!Files.isDirectory(path)) {
-            throw new IllegalArgumentException(s"this path ${pathStr} is not directory")
-          }
-
-          // Clean up files older than $fileExpiredTime
-          threadPool.execute(() => {
-            doClean(path.toFile, fileExpiredTime)
-          })
-
-          if (checkUsedCapacity(pathStr)) {
-            info("start deep clean job")
-            // Clean up files older than $deepCleanFileExpiredTime
-            threadPool.execute(() => {
-              doClean(path.toFile, deepCleanFileExpiredTime)
-            })
-            if (checkUsedCapacity(pathStr)) {
-              warn(s"after deep clean ${pathStr} " +
-                s"used space still higher than ${freeSpaceThreshold}")
-            }
-          }
-        })
-        // Once $sleepTime
-        Thread.sleep(sleepTime)
-      }
-    } catch {
-      case exception: Exception => throw exception
-    } finally {
-      if (threadPool != null) {
-        threadPool.shutdown()
-      }
-    }
-  }
-}
diff --git a/tools/kubernetes/docker/spark-block-cleaner/Dockerfile b/tools/spark-block-cleaner/kubernetes/docker/Dockerfile
similarity index 88%
rename from tools/kubernetes/docker/spark-block-cleaner/Dockerfile
rename to tools/spark-block-cleaner/kubernetes/docker/Dockerfile
index 6ce054a..d039dc3 100644
--- a/tools/kubernetes/docker/spark-block-cleaner/Dockerfile
+++ b/tools/spark-block-cleaner/kubernetes/docker/Dockerfile
@@ -23,7 +23,8 @@ RUN apt-get update && \
     mkdir -p /log/cleanerLog
 
 COPY jars /opt/block-cleaner
-COPY entrypoint.sh /opt/entrypoint.sh
+COPY tools/spark-block-cleaner/jars /opt/block-cleaner
+COPY tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh /opt/entrypoint.sh
 
 RUN chmod +x /opt/entrypoint.sh
 
diff --git a/tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh b/tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh
similarity index 100%
rename from tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh
rename to tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh
diff --git a/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml b/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml
similarity index 88%
rename from tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml
rename to tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml
index f3757c3..24d3cf6 100644
--- a/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml
+++ b/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml
@@ -48,19 +48,19 @@ spec:
             # the target dirs which in container
             - name: CACHE_DIRS
               value: /data/data1,/data/data2
-            # Cleaner will clean More distant block files
+            # Cleaner will clean More distant block files, seconds
             - name: FILE_EXPIRED_TIME
-              value: 604800000
-            # Deep clean fileExpiredTime
+              value: 604800
+            # Deep clean fileExpiredTime, seconds
             - name: DEEP_CLEAN_FILE_EXPIRED_TIME
-              value: 432000000
+              value: 432000
             # After first clean, if free Space low than threshold
             # trigger deep clean
             - name: FREE_SPACE_THRESHOLD
               value: 60
-            # Cleaner clean sleep times after cleaning
-            - name: SLEEP_TIME
-              value: 3600000
+            # Cleaner clean sleep times after cleaning, seconds
+            - name: SCHEDULE_INTERVAL
+              value: 3600
       volumes:
         # Directory on the host which store block dirs
         - name: block-files-dir-1
diff --git a/tools/kubernetes/spark-block-cleaner/pom.xml b/tools/spark-block-cleaner/pom.xml
similarity index 87%
rename from tools/kubernetes/spark-block-cleaner/pom.xml
rename to tools/spark-block-cleaner/pom.xml
index e33cb4c..7db8bda 100644
--- a/tools/kubernetes/spark-block-cleaner/pom.xml
+++ b/tools/spark-block-cleaner/pom.xml
@@ -22,16 +22,15 @@
         <artifactId>kyuubi</artifactId>
         <groupId>org.apache.kyuubi</groupId>
         <version>1.3.0-SNAPSHOT</version>
-        <relativePath>../../../pom.xml</relativePath>
+        <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>spark-block-cleaner</artifactId>
-    <name>Kyuubi Tool Kubernetes Spark Block Cleaner</name>
+    <name>Kyuubi Project Spark Block Cleaner</name>
     <packaging>jar</packaging>
 
     <dependencies>
-
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-common</artifactId>
@@ -46,6 +45,11 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
new file mode 100644
index 0000000..04b8787
--- /dev/null
+++ b/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tools
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.{CountDownLatch, Executors}
+
+import org.apache.kyuubi.Logging
+
+/*
+* Spark storage shuffle data as the following structure.
+*
+* local-dir1/
+*   blockmgr-uuid/
+*     hash-sub-dir/
+*       shuffle-data
+*       shuffle-index
+*
+* local-dir2/
+*   blockmgr-uuid/
+*     hash-sub-dir/
+*       shuffle-data
+*       shuffle-index
+*
+* ...
+*/
+object KubernetesSparkBlockCleaner extends Logging {
+  import KubernetesSparkBlockCleanerConstants._
+
+  private val envMap = System.getenv()
+
+  private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
+    "60").toInt
+  private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
+    "604800").toLong * 1000
+  private val scheduleInterval = envMap.getOrDefault(SCHEDULE_INTERVAL,
+    "3600").toLong * 1000
+  private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
+    "432000").toLong * 1000
+  private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
+    envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
+  } else {
+    throw new IllegalArgumentException(s"the env $CACHE_DIRS_KEY must be set")
+  }
+  private val isTesting = envMap.getOrDefault("kyuubi.testing", "false").toBoolean
+  checkConfiguration()
+
+  /**
+   * one thread clean one dir
+   */
+  private val threadPool = Executors.newFixedThreadPool(cacheDirs.length)
+
+  private def checkConfiguration(): Unit = {
+    require(fileExpiredTime > 0,
+      s"the env $FILE_EXPIRED_TIME_KEY should be greater than 0")
+    require(deepCleanFileExpiredTime > 0,
+      s"the env $DEEP_CLEAN_FILE_EXPIRED_TIME_KEY should be greater than 0")
+    require(scheduleInterval > 0,
+      s"the env $SCHEDULE_INTERVAL should be greater than 0")
+    require(freeSpaceThreshold > 0 && freeSpaceThreshold < 100,
+      s"the env $FREE_SPACE_THRESHOLD_KEY should between 0 and 100")
+    require(cacheDirs.nonEmpty, s"the env $CACHE_DIRS_KEY must be set")
+    cacheDirs.foreach { dir =>
+      val path = Paths.get(dir)
+      require(Files.exists(path),
+        s"the input cache dir: $dir does not exists")
+      require(Files.isDirectory(path),
+        s"the input cache dir: $dir should be a directory")
+    }
+
+    info(s"finish initializing configuration, " +
+      s"use $CACHE_DIRS_KEY: ${cacheDirs.mkString(",")},  " +
+      s"$FILE_EXPIRED_TIME_KEY: $fileExpiredTime,  " +
+      s"$FREE_SPACE_THRESHOLD_KEY: $freeSpaceThreshold, " +
+      s"$SCHEDULE_INTERVAL: $scheduleInterval, " +
+      s"$DEEP_CLEAN_FILE_EXPIRED_TIME_KEY: $deepCleanFileExpiredTime")
+  }
+
+  private def doClean(dir: File, time: Long) {
+    // clean blockManager shuffle file
+    dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
+      .foreach { blockManagerDir =>
+        info(s"start check blockManager dir ${blockManagerDir.getCanonicalPath}")
+        // check blockManager directory
+        val released = blockManagerDir.listFiles.filter(_.isDirectory).map { subDir =>
+          debug(s"start check sub dir ${subDir.getCanonicalPath}")
+          // check sub directory
+          subDir.listFiles.map(file => checkAndDeleteFile(file, time)).sum
+        }
+        // delete empty blockManager directory and all empty sub directory
+        if (blockManagerDir.listFiles().forall(
+          subDir => subDir.isDirectory && subDir.listFiles().isEmpty)) {
+          blockManagerDir.listFiles().foreach(checkAndDeleteFile(_, time, true))
+          checkAndDeleteFile(blockManagerDir, time, true)
+        }
+        info(s"finished clean blockManager dir ${blockManagerDir.getCanonicalPath}, " +
+          s"released space: ${released.sum / 1024 / 1024} MB.")
+      }
+
+    // clean spark cache file
+    dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
+      .foreach { cacheDir =>
+        info(s"start check cache dir ${cacheDir.getCanonicalPath}")
+        val released = cacheDir.listFiles.map(file => checkAndDeleteFile(file, time))
+        // delete empty spark cache file
+        checkAndDeleteFile(cacheDir, time, true)
+        info(s"finished clean cache dir ${cacheDir.getCanonicalPath}, " +
+          s"released space: ${released.sum / 1024 / 1024} MB.")
+      }
+  }
+
+  private def checkAndDeleteFile(file: File, time: Long, isDir: Boolean = false): Long = {
+    debug(s"check file ${file.getName}")
+    val shouldDeleteFile = if (isDir) {
+      file.listFiles.isEmpty && (System.currentTimeMillis() - file.lastModified() > time)
+    } else {
+      System.currentTimeMillis() - file.lastModified() > time
+    }
+    val length = if (isDir) 0 else file.length()
+    if (shouldDeleteFile) {
+      if (file.delete()) {
+        debug(s"delete file ${file.getAbsolutePath} success")
+        return length
+      } else {
+        warn(s"delete file ${file.getAbsolutePath} fail")
+      }
+    }
+    0L
+  }
+
+  import scala.sys.process._
+
+  private def needToDeepClean(dir: String): Boolean = {
+    val used = (s"df $dir" #| s"grep $dir").!!
+      .split(" ").filter(_.endsWith("%")) {
+      0
+    }.replace("%", "")
+    info(s"$dir now used $used% space")
+
+    used.toInt > (100 - freeSpaceThreshold)
+  }
+
+  private def doCleanJob(dir: String): Unit = {
+    val startTime = System.currentTimeMillis()
+    val path = Paths.get(dir)
+    info(s"start clean job for $dir")
+    doClean(path.toFile, fileExpiredTime)
+    // re check if the disk has enough space
+    if (needToDeepClean(dir)) {
+      info(s"start deep clean job for $dir")
+      doClean(path.toFile, deepCleanFileExpiredTime)
+      if (needToDeepClean(dir)) {
+        warn(s"after deep clean $dir, used space still higher than $freeSpaceThreshold")
+      }
+    }
+    val finishedTime = System.currentTimeMillis()
+    info(s"clean job $dir finished, elapsed time: ${(finishedTime - startTime) / 1000} s.")
+  }
+
+  def main(args: Array[String]): Unit = {
+    do {
+      info(s"start all clean job")
+      val startTime = System.currentTimeMillis()
+      val hasFinished = new CountDownLatch(cacheDirs.length)
+      cacheDirs.foreach { dir =>
+        threadPool.execute(() => {
+          doCleanJob(dir)
+          hasFinished.countDown()
+        })
+      }
+      hasFinished.await()
+
+      val usedTime = System.currentTimeMillis() - startTime
+      info(s"finished to clean all dir, elapsed time $usedTime")
+      if (usedTime > scheduleInterval) {
+        warn(s"clean job elapsed time $usedTime which is greater than $scheduleInterval")
+      } else {
+        Thread.sleep(scheduleInterval - usedTime)
+      }
+    } while (!isTesting)
+  }
+}
+
+object KubernetesSparkBlockCleanerConstants {
+  val CACHE_DIRS_KEY = "CACHE_DIRS"
+  val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
+  val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
+  val SCHEDULE_INTERVAL = "SCHEDULE_INTERVAL"
+  val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
+}
diff --git a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
new file mode 100644
index 0000000..dfaa1f4
--- /dev/null
+++ b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tools
+
+import java.io.File
+import java.nio.file.Files
+import java.util.UUID
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+
+class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
+  import KubernetesSparkBlockCleanerConstants._
+
+  private val rootDir = Utils.createTempDir()
+  private val cacheDir = Seq("1", "2").map(rootDir.resolve)
+  private val block1 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
+  private val block2 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
+
+  // do not remove
+  private val subDir1 = new File(block1, "01")
+  // do not remove
+  private val data11 = new File(subDir1, "shuffle_0_0_0")
+  // remove
+  private val data12 = new File(subDir1, "shuffle_0_0_1")
+
+  // remove
+  private val subDir2 = new File(block2, "02")
+  // remove
+  private val data21 = new File(subDir1, "shuffle_0_1_0")
+
+  private def deleteRecursive(path: File): Unit = {
+    path.listFiles.foreach { f =>
+      if (f.isDirectory) {
+        deleteRecursive(f)
+      } else {
+        f.delete()
+      }
+    }
+    path.delete()
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    cacheDir.foreach(Files.createDirectories(_))
+
+    // create some dir
+    Files.createDirectories(block1.toPath)
+    // hash sub dir
+    Files.createDirectory(subDir1.toPath)
+    data11.createNewFile()
+    data11.setLastModified(System.currentTimeMillis() - 10)
+    data12.createNewFile()
+    Files.write(data12.toPath, "111".getBytes())
+    data12.setLastModified(System.currentTimeMillis() - 10000000)
+
+    Files.createDirectories(block2.toPath)
+    Files.createDirectory(subDir2.toPath)
+    subDir2.setLastModified(System.currentTimeMillis() - 10000000)
+    data21.createNewFile()
+    data21.setLastModified(System.currentTimeMillis() - 10000000)
+  }
+
+  override def afterAll(): Unit = {
+    deleteRecursive(block1)
+    deleteRecursive(block2)
+
+    super.afterAll()
+  }
+
+  private def updateEnv(name: String, value: String): Unit = {
+    val env = System.getenv
+    val field = env.getClass.getDeclaredField("m")
+    field.setAccessible(true)
+    field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value)
+  }
+
+  test("test clean") {
+    updateEnv(CACHE_DIRS_KEY, cacheDir.mkString(","))
+    updateEnv(FILE_EXPIRED_TIME_KEY, "600")
+    updateEnv(SCHEDULE_INTERVAL, "1")
+    updateEnv("kyuubi.testing", "true")
+
+    KubernetesSparkBlockCleaner.main(Array.empty)
+
+    assert(block1.exists())
+    assert(subDir1.exists())
+    assert(data11.exists())
+    assert(!data12.exists())
+
+    assert(block2.exists())
+    assert(!subDir2.exists())
+    assert(!data21.exists())
+  }
+}