You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/07 04:22:30 UTC

git commit: SPARK-1154: Clean up app folders in worker nodes

Repository: spark
Updated Branches:
  refs/heads/master 410655843 -> 1440154c2


SPARK-1154: Clean up app folders in worker nodes

This is a fix for [SPARK-1154](https://issues.apache.org/jira/browse/SPARK-1154).   The issue is that worker nodes fill up with a huge number of app-* folders after some time.  This change adds a periodic cleanup task which asynchronously deletes app directories older than a configurable TTL.

Two new configuration parameters have been introduced:
  spark.worker.cleanup_interval
  spark.worker.app_data_ttl

This change does not include moving the downloads of application jars to a location outside of the work directory.  We will address that if we have time, but that potentially involves caching so it will come either as part of this PR or a separate PR.

Author: Evan Chan <ev...@ooyala.com>
Author: Kelvin Chu <ke...@yahoo.com>

Closes #288 from velvia/SPARK-1154-cleanup-app-folders and squashes the following commits:

0689995 [Evan Chan] CR from @aarondav - move config, clarify for standalone mode
9f10d96 [Evan Chan] CR from @pwendell - rename configs and add cleanup.enabled
f2f6027 [Evan Chan] CR from @andrewor14
553d8c2 [Kelvin Chu] change the variable name to currentTimeMillis since it actually tracks in seconds
8dc9cb5 [Kelvin Chu] Fixed a bug in Utils.findOldFiles() after merge.
cb52f2b [Kelvin Chu] Change the name of findOldestFiles() to findOldFiles()
72f7d2d [Kelvin Chu] Fix a bug of Utils.findOldestFiles(). file.lastModified is returned in milliseconds.
ad99955 [Kelvin Chu] Add unit test for Utils.findOldestFiles()
dc1a311 [Evan Chan] Don't recompute current time with every new file
e3c408e [Evan Chan] Document the two new settings
b92752b [Evan Chan] SPARK-1154: Add a periodic task to clean up app directories


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1440154c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1440154c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1440154c

Branch: refs/heads/master
Commit: 1440154c27ca48b5a75103eccc9057286d3f6ca8
Parents: 4106558
Author: Evan Chan <ev...@ooyala.com>
Authored: Sun Apr 6 19:17:33 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Apr 6 19:21:40 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  4 +++
 .../org/apache/spark/deploy/worker/Worker.scala | 23 ++++++++++++++++-
 .../scala/org/apache/spark/util/Utils.scala     | 19 ++++++++++++--
 .../org/apache/spark/util/UtilsSuite.scala      | 15 ++++++++++-
 docs/configuration.md                           | 26 ++++++++++++++++++++
 5 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1440154c/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 83ce14a..a7368f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
 
   case class KillDriver(driverId: String) extends DeployMessage
 
+  // Worker internal
+
+  case object WorkDirCleanup      // Sent to Worker actor periodically for cleaning up app folders
+
   // AppClient to Master
 
   case class RegisterApplication(appDescription: ApplicationDescription)

http://git-wip-us.apache.org/repos/asf/spark/blob/1440154c/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8a71ddd..bf5a8d0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -64,6 +64,12 @@ private[spark] class Worker(
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
 
+  val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
+  // How often worker will clean up old app folders
+  val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
+  // TTL for app folders/data;  after TTL expires it will be cleaned up
+  val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+
   // Index into masterUrls that we're currently trying to register with.
   var masterIndex = 0
 
@@ -179,12 +185,28 @@ private[spark] class Worker(
       registered = true
       changeMaster(masterUrl, masterWebUiUrl)
       context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+      if (CLEANUP_ENABLED) {
+        context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
+          CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
+      }
 
     case SendHeartbeat =>
       masterLock.synchronized {
         if (connected) { master ! Heartbeat(workerId) }
       }
 
+    case WorkDirCleanup =>
+      // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
+      val cleanupFuture = concurrent.future {
+        logInfo("Cleaning up oldest application directories in " + workDir + " ...")
+        Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
+          .foreach(Utils.deleteRecursively)
+      }
+      cleanupFuture onFailure {
+        case e: Throwable =>
+          logError("App dir cleanup failed: " + e.getMessage, e)
+      }
+
     case MasterChanged(masterUrl, masterWebUiUrl) =>
       logInfo("Master has changed, new master is at " + masterUrl)
       changeMaster(masterUrl, masterWebUiUrl)
@@ -331,7 +353,6 @@ private[spark] class Worker(
 }
 
 private[spark] object Worker {
-
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

http://git-wip-us.apache.org/repos/asf/spark/blob/1440154c/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d3c39de..4435b21 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
     }
 
     if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
-      return false;
+      return false
     } else {
-      return true;
+      return true
+    }
+  }
+
+  /**
+   * Finds all the files in a directory whose last modified time is older than cutoff seconds.
+   * @param dir  must be the path to a directory, or IllegalArgumentException is thrown
+   * @param cutoff measured in seconds. Files older than this are returned.
+   */
+  def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+    val currentTimeMillis = System.currentTimeMillis
+    if (dir.isDirectory) {
+      val files = listFilesSafely(dir)
+      files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+    } else {
+      throw new IllegalArgumentException(dir + " is not a directory!")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1440154c/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 616214f..eb7fb63 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
 
 import scala.util.Random
 
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
 import java.nio.{ByteBuffer, ByteOrder}
 
 import com.google.common.base.Charsets
@@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
     val iterator = Iterator.range(0, 5)
     assert(Utils.getIteratorSize(iterator) === 5L)
   }
+
+  test("findOldFiles") {
+    // create some temporary directories and files
+    val parent: File = Utils.createTempDir()
+    val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
+    val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+    // set the last modified time of child1 to 10 secs old
+    child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+
+    val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
+    assert(result.size.equals(1))
+    assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1440154c/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b6005ac..57bda20 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -349,6 +349,32 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>spark.worker.cleanup.enabled</td>
+  <td>true</td>
+  <td>
+    Enable periodic cleanup of worker / application directories.  Note that this only affects standalone
+    mode, as YARN works differently.
+  </td>
+</tr>
+<tr>
+  <td>spark.worker.cleanup.interval</td>
+  <td>1800 (30 minutes)</td>
+  <td>
+    Controls the interval, in seconds, at which the worker cleans up old application work dirs
+    on the local machine.
+  </td>
+</tr>
+<tr>
+  <td>spark.worker.cleanup.appDataTtl</td>
+  <td>7 * 24 * 3600 (7 days)</td>
+  <td>
+    The number of seconds to retain application work directories on each worker.  This is a Time To Live
+    and should depend on the amount of available disk space you have.  Application logs and jars are
+    downloaded to each application work dir.  Over time, the work dirs can quickly fill up disk space,
+    especially if you run jobs very frequently.
+  </td>
+</tr>
+<tr>
   <td>spark.akka.frameSize</td>
   <td>10</td>
   <td>