You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/23 21:02:18 UTC

spark git commit: [SPARK-4834] [standalone] Clean up application files after app finishes.

Repository: spark
Updated Branches:
  refs/heads/master 2d215aeba -> dd155369a


[SPARK-4834] [standalone] Clean up application files after app finishes.

Commit 7aacb7bfa added support for sharing downloaded files among multiple
executors of the same app. That works great in Yarn, since the app's directory
is cleaned up after the app is done.

But Spark standalone mode didn't do that, so the lock/cache files created
by that change were left around and could eventually fill up the disk hosting
/tmp.

To solve that, create app-specific directories under the local dirs when
launching executors. Multiple executors launched by the same Worker will
use the same app directories, so they should be able to share the downloaded
files. When the application finishes, a new message is sent to all workers
telling them the application has finished; once that message has been received,
and all executors registered for the application shut down, then those
directories will be cleaned up by the Worker.

Note: Unit testing this is hard (if even possible), since local-cluster mode
doesn't seem to leave the Master/Worker daemons running long enough after
`sc.stop()` is called for the clean up protocol to take effect.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #3705 from vanzin/SPARK-4834 and squashes the following commits:

b430534 [Marcelo Vanzin] Remove seemingly unnecessary synchronization.
50eb4b9 [Marcelo Vanzin] Review feedback.
c0e5ea5 [Marcelo Vanzin] [SPARK-4834] [standalone] Clean up application files after app finishes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd155369
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd155369
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd155369

Branch: refs/heads/master
Commit: dd155369a04d7dfbf6a5745cbb243e22218367dc
Parents: 2d215ae
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Dec 23 12:02:08 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Dec 23 12:02:08 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  3 ++
 .../org/apache/spark/deploy/master/Master.scala |  5 +++
 .../spark/deploy/worker/ExecutorRunner.scala    |  4 ++-
 .../org/apache/spark/deploy/worker/Worker.scala | 36 ++++++++++++++++++--
 .../scala/org/apache/spark/util/Utils.scala     | 16 +++++++--
 .../apache/spark/deploy/JsonProtocolSuite.scala |  2 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |  3 +-
 7 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c46f84d..243d8ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -88,6 +88,8 @@ private[deploy] object DeployMessages {
 
   case class KillDriver(driverId: String) extends DeployMessage
 
+  case class ApplicationFinished(id: String)
+
   // Worker internal
 
   case object WorkDirCleanup      // Sent to Worker actor periodically for cleaning up app folders
@@ -175,4 +177,5 @@ private[deploy] object DeployMessages {
   // Liveness checks in various places
 
   case object SendHeartbeat
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ed5eab9..f8137bf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -704,6 +704,11 @@ private[spark] class Master(
       }
       persistenceEngine.removeApplication(app)
       schedule()
+
+      // Tell all workers that the application has finished, so they can clean up any app state.
+      workers.foreach { w =>
+        w.actor ! ApplicationFinished(app.id)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f4fedc6..acbdf0d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
     val executorDir: File,
     val workerUrl: String,
     val conf: SparkConf,
+    val appLocalDirs: Seq[String],
     var state: ExecutorState.Value)
   extends Logging {
 
@@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
   /**
    * Kill executor process, wait for exit and notify worker to update resource status.
    *
-   * @param message the exception message which caused the executor's death 
+   * @param message the exception message which caused the executor's death
    */
   private def killProcess(message: Option[String]) {
     var exitCode: Option[Int] = None
@@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
 
       builder.directory(executorDir)
+      builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
       // In case we are running this from within the Spark Shell, avoid creating a "scala"
       // parent process for the executor command
       builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 6863b62..86a87ec 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import java.util.{UUID, Date}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Random
@@ -109,6 +109,8 @@ private[spark] class Worker(
   val finishedExecutors = new HashMap[String, ExecutorRunner]
   val drivers = new HashMap[String, DriverRunner]
   val finishedDrivers = new HashMap[String, DriverRunner]
+  val appDirectories = new HashMap[String, Seq[String]]
+  val finishedApps = new HashSet[String]
 
   // The shuffle service is not actually started unless configured.
   val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
@@ -294,7 +296,7 @@ private[spark] class Worker(
           val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
           dir.isDirectory && !isAppStillRunning &&
           !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
-        }.foreach { dir => 
+        }.foreach { dir =>
           logInfo(s"Removing directory: ${dir.getPath}")
           Utils.deleteRecursively(dir)
         }
@@ -339,8 +341,19 @@ private[spark] class Worker(
             throw new IOException("Failed to create directory " + executorDir)
           }
 
+          // Create local dirs for the executor. These are passed to the executor via the
+          // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
+          // application finishes.
+          val appLocalDirs = appDirectories.get(appId).getOrElse {
+            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
+              Utils.createDirectory(dir).getAbsolutePath()
+            }.toSeq
+          }
+          appDirectories(appId) = appLocalDirs
+
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
+            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
+            ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -377,6 +390,7 @@ private[spark] class Worker(
               message.map(" message " + _).getOrElse("") +
               exitStatus.map(" exitStatus " + _).getOrElse(""))
         }
+        maybeCleanupApplication(appId)
       }
 
     case KillExecutor(masterUrl, appId, execId) =>
@@ -446,6 +460,9 @@ private[spark] class Worker(
     case ReregisterWithMaster =>
       reregisterWithMaster()
 
+    case ApplicationFinished(id) =>
+      finishedApps += id
+      maybeCleanupApplication(id)
   }
 
   private def masterDisconnected() {
@@ -454,6 +471,19 @@ private[spark] class Worker(
     registerWithMaster()
   }
 
+  private def maybeCleanupApplication(id: String): Unit = {
+    val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
+    if (shouldCleanup) {
+      finishedApps -= id
+      appDirectories.remove(id).foreach { dirList =>
+        logInfo(s"Cleaning up local directories for application $id")
+        dirList.foreach { dir =>
+          Utils.deleteRecursively(new File(dir))
+        }
+      }
+    }
+  }
+
   def generateWorkerId(): String = {
     "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5e1cb0c..8c00f2c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
     retval
   }
 
-  /** Create a temporary directory inside the given parent directory */
-  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+  /**
+   * Create a directory inside the given parent directory. The directory is guaranteed to be
+   * newly created, and is not marked for automatic deletion.
+   */
+  def createDirectory(root: String): File = {
     var attempts = 0
     val maxAttempts = 10
     var dir: File = null
@@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
       } catch { case e: SecurityException => dir = null; }
     }
 
+    dir
+  }
+
+  /**
+   * Create a temporary directory inside the given parent directory. The directory will be
+   * automatically deleted when the VM shuts down.
+   */
+  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+    val dir = createDirectory(root)
     registerShutdownDeleteDir(dir)
     dir
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 3f1cd07..aa65f7e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
   def createExecutorRunner(): ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
       new File("sparkHome"), new File("workDir"), "akka://worker",
-      new SparkConf, ExecutorState.RUNNING)
+      new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
   }
 
   def createDriverRunner(): DriverRunner = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dd155369/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 1962170..6f233d7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
       Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
-      new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
+      new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
+      ExecutorState.RUNNING)
     val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
     assert(builder.command().last === appId)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org