You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/13 20:11:42 UTC

[1/2] SPARK-1183. Don't use "worker" to mean executor

Repository: spark
Updated Branches:
  refs/heads/master e4e8d8f39 -> 698373211


http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
deleted file mode 100644
index bfa8f84..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-
-trait WorkerRunnableUtil extends Logging {
-
-  val yarnConf: YarnConfiguration
-  val sparkConf: SparkConf
-  lazy val env = prepareEnvironment
-
-  def prepareCommand(
-      masterAddress: String,
-      slaveId: String,
-      hostname: String,
-      workerMemory: Int,
-      workerCores: Int) = {
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-    // Set the JVM memory
-    val workerMemoryString = workerMemory + "m"
-    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence
-    // if there are multiple containers in same node, spark gc effects all other containers
-    // performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-    /*
-        else {
-          // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-          // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
-          // want to mess with it.
-          // In our expts, using (default) throughput collector has severe perf ramnifications in
-          // multi-tennent machines
-          // The options are based on
-          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-          JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-          JAVA_OPTS += " -XX:+CMSIncrementalMode "
-          JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-        }
-    */
-
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
-      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
-      // an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
-      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      workerCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    commands
-  }
-
-  private def setupDistributedCache(
-      file: String,
-      rtype: LocalResourceType,
-      localResources: HashMap[String, LocalResource],
-      timestamp: String,
-      size: String, 
-      vis: String) = {
-    val uri = new URI(file)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
-    amJarRsrc.setTimestamp(timestamp.toLong)
-    amJarRsrc.setSize(size.toLong)
-    localResources(uri.getFragment()) = amJarRsrc
-  }
-
-  def prepareLocalResources: HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    val localResources = HashMap[String, LocalResource]()
-
-    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
-      for( i <- 0 to distFiles.length - 1) {
-        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i), visibilities(i))
-      }
-    }
-
-    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
-      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
-      for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
-          timeStamps(i), fileSizes(i), visibilities(i))
-      }
-    }
-
-    logInfo("Prepared Local resources " + localResources)
-    localResources
-  }
-
-  def prepareEnvironment: HashMap[String, String] = {
-    val env = new HashMap[String, String]()
-
-    ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
-    // Allow users to specify some environment variables
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    env
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9..6b91e6b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
 
 /**
  *
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
  */
 private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
 
@@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
 
   override def postStartHook() {
 
-    // The yarn application is running, but the worker might not yet ready
+    // The yarn application is running, but the executor might not yet ready
     // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
     Thread.sleep(2000L)
     logInfo("YarnClientClusterScheduler.postStartHook done")

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e7130d2..d1f13e3 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
       "--class", "notused",
       "--jar", null,
       "--args", hostport,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+      "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
     )
 
     // process any optional arguments, use the defaults already defined in ClientArguments 
     // if things aren't specified
-    Map("--master-memory" -> "SPARK_MASTER_MEMORY",
-      "--num-workers" -> "SPARK_WORKER_INSTANCES",
-      "--worker-memory" -> "SPARK_WORKER_MEMORY",
-      "--worker-cores" -> "SPARK_WORKER_CORES",
-      "--queue" -> "SPARK_YARN_QUEUE",
-      "--name" -> "SPARK_YARN_APP_NAME",
-      "--files" -> "SPARK_YARN_DIST_FILES",
-      "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
-    .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
+    Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
+      "SPARK_DRIVER_MEMORY" -> "--driver-memory",
+      "SPARK_WORKER_INSTANCES" -> "--num-executors",
+      "SPARK_WORKER_MEMORY" -> "--executor-memory",
+      "SPARK_WORKER_CORES" -> "--executor-cores",
+      "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
+      "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
+      "SPARK_EXECUTOR_CORES" -> "--executor-cores",
+      "SPARK_YARN_QUEUE" -> "--queue",
+      "SPARK_YARN_APP_NAME" -> "--name",
+      "SPARK_YARN_DIST_FILES" -> "--files",
+      "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
+    .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
       
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
@@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   def waitForApp() {
 
-    // TODO : need a better way to find out whether the workers are ready or not
+    // TODO : need a better way to find out whether the executors are ready or not
     // maybe by resource usage report?
     while(true) {
       val report = client.getApplicationReport(appId)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 57d1577..30735cb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private var isLastAMRetry: Boolean = true
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  // Default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3))
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
 
   private var registered = false
   
@@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     // Call this to force generation of secret so it gets populated into the
     // hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the worker containers.
+    // doAs in order for the credentials to be passed on to the executor containers.
     val securityMgr = new SecurityManager(sparkConf)
 
     // Start the user's JAR
@@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
 
     // Allocate all containers
-    allocateWorkers()
+    allocateExecutors()
 
     // Wait for the user class to Finish
     userThread.join()
@@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     t
   }
 
-  // This need to happen before allocateWorkers()
+  // This need to happen before allocateExecutors()
   private def waitForSparkContextInitialized() {
     logInfo("Waiting for Spark context initialization")
     try {
@@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
   }
 
-  private def allocateWorkers() {
+  private def allocateExecutors() {
     try {
-      logInfo("Allocating " + args.numWorkers + " workers.")
+      logInfo("Allocating " + args.numExecutors + " executors.")
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
-      yarnAllocator.addResourceRequests(args.numWorkers)
+      yarnAllocator.addResourceRequests(args.numExecutors)
       // Exits the loop if the user thread exits.
-      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
-        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
           finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of worker failures reached")
+            "max number of executor failures reached")
         }
         yarnAllocator.allocateResources()
         ApplicationMaster.incrementAllocatorLoop(1)
@@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
-    logInfo("All workers have launched.")
+    logInfo("All executors have launched.")
 
     // Launch a progress reporter thread, else the app will get killed after expiration
     // (def: 10mins) timeout.
@@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
-          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
             finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of worker failures reached")
+              "max number of executor failures reached")
           }
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
             yarnAllocator.getNumPendingAllocate
-          if (missingWorkerCount > 0) {
+          if (missingExecutorCount > 0) {
             logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.addResourceRequests(missingWorkerCount)
+              format(missingExecutorCount))
+            yarnAllocator.addResourceRequests(missingExecutorCount)
           }
           sendProgress()
           Thread.sleep(sleepTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
new file mode 100644
index 0000000..b697f10
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote._
+import akka.actor.Terminated
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
+
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+  private var appAttemptId: ApplicationAttemptId = _
+  private var reporterThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var driverClosed:Boolean = false
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+
+  val securityManager = new SecurityManager(sparkConf)
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+    conf = sparkConf, securityManager = securityManager)._1
+  var actor: ActorRef = _
+
+  // This actor just working as a monitor to watch on Driver Actor.
+  class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorSelection = _
+
+    override def preStart() {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+      driver ! "Hello"
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        driverClosed = true
+    }
+  }
+
+  def run() {
+
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(yarnConf)
+    amClient.start()
+
+    appAttemptId = getApplicationAttemptId()
+    registerApplicationMaster()
+
+    waitForSparkMaster()
+
+    // Allocate all containers
+    allocateExecutors()
+
+    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+    reporterThread = launchReporterThread(interval)
+    
+
+    // Wait for the reporter thread to Finish.
+    reporterThread.join()
+
+    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+    actorSystem.shutdown()
+
+    logInfo("Exited")
+    System.exit(0)
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+ 
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
+    }
+  } 
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    // TODO:(Raymond) Find out Spark UI address and fill in here?
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+  }
+
+  private def waitForSparkMaster() {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while(!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+        Thread.sleep(100)
+      }
+    }
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
+
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+  }
+
+
+  private def allocateExecutors() {
+
+    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
+
+    yarnAllocator = YarnAllocationHandler.newAllocator(
+      yarnConf,
+      amClient,
+      appAttemptId,
+      args,
+      preferredNodeLocationData,
+      sparkConf)
+
+    logInfo("Allocating " + args.numExecutors + " executors.")
+    // Wait until all containers have finished
+    // TODO: This is a bit ugly. Can we make it nicer?
+    // TODO: Handle container failure
+
+    yarnAllocator.addResourceRequests(args.numExecutors)
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+      yarnAllocator.allocateResources()
+      Thread.sleep(100)
+    }
+
+    logInfo("All executors have launched.")
+
+  }
+
+  // TODO: We might want to extend this to allocate more containers in case they die !
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (!driverClosed) {
+          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+            yarnAllocator.getNumPendingAllocate
+          if (missingExecutorCount > 0) {
+            logInfo("Allocating %d containers to make up for (potentially) lost containers".
+              format(missingExecutorCount))
+            yarnAllocator.addResourceRequests(missingExecutorCount)
+          }
+          sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateResources()
+  }
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+    logInfo("finish ApplicationMaster with " + status)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+  }
+
+}
+
+
+object ExecutorLauncher {
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new ExecutorLauncher(args).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
new file mode 100644
index 0000000..53c403f
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+
+import org.apache.spark.{SparkConf, Logging}
+
+
+class ExecutorRunnable(
+    container: Container,
+    conf: Configuration,
+    spConf: SparkConf,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    executorMemory: Int,
+    executorCores: Int)
+  extends Runnable with ExecutorRunnableUtil with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var nmClient: NMClient = _
+  val sparkConf = spConf
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Executor Container")
+    nmClient = NMClient.createNMClient()
+    nmClient.init(yarnConf)
+    nmClient.start()
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    ctx.setEnvironment(env)
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
+
+    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
+
+    logInfo("Setting up executor with commands: " + commands)
+    ctx.setCommands(commands)
+
+    // Send the start request to the ContainerManager
+    nmClient.startContainer(container, ctx)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index f1c1fea..0000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
-  extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
-
-  private var amClient: AMRMClient[ContainerRequest] = _
-
-  val securityManager = new SecurityManager(sparkConf)
-  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf, securityManager = securityManager)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-    }
-  }
-
-  def run() {
-
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(yarnConf)
-    amClient.start()
-
-    appAttemptId = getApplicationAttemptId()
-    registerApplicationMaster()
-
-    waitForSparkMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-    // must be <= timeoutInterval / 2.
-    val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-    reporterThread = launchReporterThread(interval)
-    
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
- 
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  } 
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    // TODO:(Raymond) Find out Spark UI address and fill in here?
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateWorkers() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(
-      yarnConf,
-      amClient,
-      appAttemptId,
-      args,
-      preferredNodeLocationData,
-      sparkConf)
-
-    logInfo("Allocating " + args.numWorkers + " workers.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-
-    yarnAllocator.addResourceRequests(args.numWorkers)
-    while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
-      yarnAllocator.allocateResources()
-      Thread.sleep(100)
-    }
-
-    logInfo("All workers have launched.")
-
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed) {
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
-            yarnAllocator.getNumPendingAllocate
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.addResourceRequests(missingWorkerCount)
-          }
-          sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateResources()
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-    logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
-  }
-
-}
-
-
-object WorkerLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index ab4a79b..0000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.client.api.NMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
-
-
-class WorkerRunnable(
-    container: Container,
-    conf: Configuration,
-    spConf: SparkConf,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    workerMemory: Int,
-    workerCores: Int) 
-  extends Runnable with WorkerRunnableUtil with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var nmClient: NMClient = _
-  val sparkConf = spConf
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Worker Container")
-    nmClient = NMClient.createNMClient()
-    nmClient.init(yarnConf)
-    nmClient.start()
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    ctx.setEnvironment(env)
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
-
-    val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
-
-    logInfo("Setting up worker with commands: " + commands)
-    ctx.setCommands(commands)
-
-    // Send the start request to the ContainerManager
-    nmClient.startContainer(container, ctx)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 1ac6112..e31c406 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val amClient: AMRMClient[ContainerRequest],
     val appAttemptId: ApplicationAttemptId,
-    val maxWorkers: Int,
-    val workerMemory: Int,
-    val workerCores: Int,
+    val maxExecutors: Int,
+    val executorMemory: Int,
+    val executorCores: Int,
     val preferredHostToCount: Map[String, Int], 
     val preferredRackToCount: Map[String, Int],
     val sparkConf: SparkConf)
@@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
   // Number of container requests that have been sent to, but not yet allocated by the
   // ApplicationMaster.
   private val numPendingAllocate = new AtomicInteger()
-  private val numWorkersRunning = new AtomicInteger()
-  // Used to generate a unique id per worker
-  private val workerIdCounter = new AtomicInteger()
+  private val numExecutorsRunning = new AtomicInteger()
+  // Used to generate a unique id per executor
+  private val executorIdCounter = new AtomicInteger()
   private val lastResponseId = new AtomicInteger()
-  private val numWorkersFailed = new AtomicInteger()
+  private val numExecutorsFailed = new AtomicInteger()
 
   def getNumPendingAllocate: Int = numPendingAllocate.intValue
 
-  def getNumWorkersRunning: Int = numWorkersRunning.intValue
+  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
-  def getNumWorkersFailed: Int = numWorkersFailed.intValue
+  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
   }
 
   def releaseContainer(container: Container) {
@@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
 
       logDebug("""
         Allocated containers: %d
-        Current worker count: %d
+        Current executor count: %d
         Containers released: %s
         Containers to-be-released: %s
         Cluster resources: %s
         """.format(
           allocatedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers,
           allocateResponse.getAvailableResources))
@@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
 
       // Run each of the allocated containers.
       for (container <- allocatedContainersToProcess) {
-        val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
-        val workerHostname = container.getNodeId.getHost
+        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+        val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-        assert(container.getResource.getMemory >= workerMemoryOverhead)
+        val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+        assert(container.getResource.getMemory >= executorMemoryOverhead)
 
-        if (numWorkersRunningNow > maxWorkers) {
+        if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, workerHostname))
+            containers for it.""".format(containerId, executorHostname))
           releaseContainer(container)
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
         } else {
-          val workerId = workerIdCounter.incrementAndGet().toString
+          val executorId = executorIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
             sparkConf.get("spark.driver.host"),
             sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
-          logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
+          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
 
           // To be safe, remove the container from `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
 
-          val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
           allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
               new HashSet[ContainerId]())
 
             containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, workerHostname)
+            allocatedContainerToHostMap.put(containerId, executorHostname)
 
             if (rack != null) {
               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
             }
           }
-          logInfo("Launching WorkerRunnable. driverUrl: %s,  workerHostname: %s".format(driverUrl, workerHostname))
-          val workerRunnable = new WorkerRunnable(
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(driverUrl, executorHostname))
+          val executorRunnable = new ExecutorRunnable(
             container,
             conf,
             sparkConf,
             driverUrl,
-            workerId,
-            workerHostname,
-            workerMemory,
-            workerCores)
-          new Thread(workerRunnable).start()
+            executorId,
+            executorHostname,
+            executorMemory,
+            executorCores)
+          new Thread(executorRunnable).start()
         }
       }
       logDebug("""
         Finished allocating %s containers (from %s originally).
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           allocatedContainersToProcess,
           allocatedContainers,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
           // `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
         } else {
-          // Decrement the number of workers running. The next iteration of the ApplicationMaster's
+          // Decrement the number of executors running. The next iteration of the ApplicationMaster's
           // reporting thread will take care of allocating.
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
             containerId,
             completedContainer.getState,
@@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
           // now I think its ok as none of the containers are expected to exit
           if (completedContainer.getExitStatus() != 0) {
             logInfo("Container marked as failed: " + containerId)
-            numWorkersFailed.incrementAndGet()
+            numExecutorsFailed.incrementAndGet()
           }
         }
 
@@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
       }
       logDebug("""
         Finished processing %d completed containers.
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           completedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
     retval
   }
 
-  def addResourceRequests(numWorkers: Int) {
+  def addResourceRequests(numExecutors: Int) {
     val containerRequests: List[ContainerRequest] =
-      if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
-        logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+      if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+        logDebug("numExecutors: " + numExecutors + ", host preferences: " +
           preferredHostToCount.isEmpty)
         createResourceRequests(
           AllocationType.ANY,
           resource = null,
-          numWorkers,
+          numExecutors,
           YarnAllocationHandler.PRIORITY).toList
       } else {
-        // Request for all hosts in preferred nodes and for numWorkers - 
+        // Request for all hosts in preferred nodes and for numExecutors - 
         // candidates.size, request by default allocation policy.
         val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
         for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
         val anyContainerRequests = createResourceRequests(
           AllocationType.ANY,
           resource = null,
-          numWorkers,
+          numExecutors,
           YarnAllocationHandler.PRIORITY)
 
         val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
       amClient.addContainerRequest(request)
     }
 
-    if (numWorkers > 0) {
-      numPendingAllocate.addAndGet(numWorkers)
-      logInfo("Will Allocate %d worker containers, each with %d memory".format(
-        numWorkers,
-        (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+    if (numExecutors > 0) {
+      numPendingAllocate.addAndGet(numExecutors)
+      logInfo("Will Allocate %d executor containers, each with %d memory".format(
+        numExecutors,
+        (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
     } else {
       logDebug("Empty allocation request ...")
     }
@@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
   private def createResourceRequests(
       requestType: AllocationType.AllocationType,
       resource: String,
-      numWorkers: Int,
+      numExecutors: Int,
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
@@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
         val nodeLocal = constructContainerRequests(
           Array(hostname),
           racks = null,
-          numWorkers,
+          numExecutors,
           priority)
 
         // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
@@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
       }
       case AllocationType.RACK => {
         val rack = resource
-        constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
+        constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
       }
       case AllocationType.ANY => constructContainerRequests(
-        hosts = null, racks = null, numWorkers, priority)
+        hosts = null, racks = null, numExecutors, priority)
       case _ => throw new IllegalArgumentException(
         "Unexpected/unsupported request type: " + requestType)
     }
@@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
   private def constructContainerRequests(
       hosts: Array[String],
       racks: Array[String],
-      numWorkers: Int,
+      numExecutors: Int,
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
-    val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    val resource = Resource.newInstance(memoryRequest, workerCores)
+    val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val resource = Resource.newInstance(memoryRequest, executorCores)
 
     val prioritySetting = Records.newRecord(classOf[Priority])
     prioritySetting.setPriority(priority)
 
     val requests = new ArrayBuffer[ContainerRequest]()
-    for (i <- 0 until numWorkers) {
+    for (i <- 0 until numExecutors) {
       requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
     }
     requests
@@ -574,9 +574,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       Map[String, Int](),
       Map[String, Int](),
       sparkConf)
@@ -596,9 +596,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       hostToSplitCount,
       rackToSplitCount,
       sparkConf)
@@ -608,9 +608,9 @@ object YarnAllocationHandler {
       conf: Configuration,
       amClient: AMRMClient[ContainerRequest],
       appAttemptId: ApplicationAttemptId,
-      maxWorkers: Int,
-      workerMemory: Int,
-      workerCores: Int,
+      maxExecutors: Int,
+      executorMemory: Int,
+      executorCores: Int,
       map: collection.Map[String, collection.Set[SplitInfo]],
       sparkConf: SparkConf
     ): YarnAllocationHandler = {
@@ -619,9 +619,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      maxWorkers,
-      workerMemory,
-      workerCores,
+      maxExecutors,
+      executorMemory,
+      executorCores,
       hostToCount,
       rackToCount,
       sparkConf)


[2/2] git commit: SPARK-1183. Don't use "worker" to mean executor

Posted by pw...@apache.org.
SPARK-1183. Don't use "worker" to mean executor

Author: Sandy Ryza <sa...@cloudera.com>

Closes #120 from sryza/sandy-spark-1183 and squashes the following commits:

5066a4a [Sandy Ryza] Remove "worker" in a couple comments
0bd1e46 [Sandy Ryza] Remove --am-class from usage
bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha
607539f [Sandy Ryza] Address review comments
74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69837321
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69837321
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69837321

Branch: refs/heads/master
Commit: 698373211ef3cdf841c82d48168cd5dbe00a57b4
Parents: e4e8d8f
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Thu Mar 13 12:11:33 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Mar 13 12:11:33 2014 -0700

----------------------------------------------------------------------
 docs/cluster-overview.md                        |   2 +-
 docs/graphx-programming-guide.md                |   2 +-
 docs/job-scheduling.md                          |   4 +-
 docs/mllib-classification-regression.md         |   4 +-
 docs/python-programming-guide.md                |   6 +-
 docs/running-on-yarn.md                         |  29 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  38 +--
 .../spark/deploy/yarn/ExecutorLauncher.scala    | 272 +++++++++++++++++++
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 117 ++++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 272 -------------------
 .../spark/deploy/yarn/WorkerRunnable.scala      | 117 --------
 .../deploy/yarn/YarnAllocationHandler.scala     | 124 ++++-----
 .../yarn/ApplicationMasterArguments.scala       |  27 +-
 .../spark/deploy/yarn/ClientArguments.scala     |  46 ++--
 .../apache/spark/deploy/yarn/ClientBase.scala   |  18 +-
 .../yarn/ClientDistributedCacheManager.scala    |   4 +-
 .../deploy/yarn/ExecutorRunnableUtil.scala      | 176 ++++++++++++
 .../spark/deploy/yarn/WorkerRunnableUtil.scala  | 176 ------------
 .../cluster/YarnClientClusterScheduler.scala    |   4 +-
 .../cluster/YarnClientSchedulerBackend.scala    |  26 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  38 +--
 .../spark/deploy/yarn/ExecutorLauncher.scala    | 252 +++++++++++++++++
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  90 ++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 252 -----------------
 .../spark/deploy/yarn/WorkerRunnable.scala      |  90 ------
 .../deploy/yarn/YarnAllocationHandler.scala     | 138 +++++-----
 26 files changed, 1171 insertions(+), 1153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/cluster-overview.md
----------------------------------------------------------------------
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index e167032..a555a7b 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
 Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
 (either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
 applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
-worker processes that run computations and store data for your application.
+processes that run computations and store data for your application.
 Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
 the executors. Finally, SparkContext sends *tasks* for the executors to run.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 3dfed7b..1238e3e 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -135,7 +135,7 @@ Like RDDs, property graphs are immutable, distributed, and fault-tolerant.  Chan
 structure of the graph are accomplished by producing a new graph with the desired changes.  Note
 that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
 are reused in the new graph reducing the cost of this inherently functional data-structure.  The
-graph is partitioned across the workers using a range of vertex-partitioning heuristics.  As with
+graph is partitioned across the executors using a range of vertex-partitioning heuristics.  As with
 RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
 
 Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index df2faa5..94604f3 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -39,8 +39,8 @@ Resource allocation can be configured as follows, based on the cluster type:
 * **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
   and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode.
   You should also set `spark.executor.memory` to control the executor memory.
-* **YARN:** The `--num-workers` option to the Spark YARN client controls how many workers it will allocate
-  on the cluster, while `--worker-memory` and `--worker-cores` control the resources per worker.
+* **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate
+  on the cluster, while `--executor-memory` and `--executor-cores` control the resources per executor.
 
 A second option available on Mesos is _dynamic sharing_ of CPU cores. In this mode, each Spark application
 still has a fixed and independent memory allocation (set by `spark.executor.memory`), but when the

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/mllib-classification-regression.md
----------------------------------------------------------------------
diff --git a/docs/mllib-classification-regression.md b/docs/mllib-classification-regression.md
index 18a3e8e..d5bd804 100644
--- a/docs/mllib-classification-regression.md
+++ b/docs/mllib-classification-regression.md
@@ -77,8 +77,8 @@ between the two goals of small loss and small model complexity.
 
 **Distributed Datasets.**
 For all currently implemented optimization methods for classification, the data must be
-distributed between the worker machines *by examples*. Every machine holds a consecutive block of
-the `$n$` example/label pairs `$(\x_i,y_i)$`. 
+distributed between processes on the worker machines *by examples*. Machines hold consecutive
+blocks of the `$n$` example/label pairs `$(\x_i,y_i)$`. 
 In other words, the input distributed dataset
 ([RDD](scala-programming-guide.html#resilient-distributed-datasets-rdds)) must be the set of
 vectors `$\x_i\in\R^d$`.

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/python-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 57ed54c..cbe7d82 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -43,9 +43,9 @@ def is_error(line):
 errors = logData.filter(is_error)
 {% endhighlight %}
 
-PySpark will automatically ship these functions to workers, along with any objects that they reference.
-Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
-The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
+PySpark will automatically ship these functions to executors, along with any objects that they reference.
+Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors.
+The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors.
 
 In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b179295..2e9dec4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -41,7 +41,7 @@ System Properties:
 * `spark.yarn.submit.file.replication`, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
 * `spark.yarn.preserve.staging.files`, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
 * `spark.yarn.scheduler.heartbeat.interval-ms`, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds. 
-* `spark.yarn.max.worker.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
+* `spark.yarn.max.executor.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
 
 # Launching Spark on YARN
 
@@ -60,11 +60,10 @@ The command to launch the Spark application on the cluster is as follows:
       --jar <YOUR_APP_JAR_FILE> \
       --class <APP_MAIN_CLASS> \
       --args <APP_MAIN_ARGUMENTS> \
-      --num-workers <NUMBER_OF_EXECUTORS> \
-      --master-class <ApplicationMaster_CLASS>
-      --master-memory <MEMORY_FOR_MASTER> \
-      --worker-memory <MEMORY_PER_EXECUTOR> \
-      --worker-cores <CORES_PER_EXECUTOR> \
+      --num-executors <NUMBER_OF_EXECUTOR_PROCESSES> \
+      --driver-memory <MEMORY_FOR_ApplicationMaster> \
+      --executor-memory <MEMORY_PER_EXECUTOR> \
+      --executor-cores <CORES_PER_EXECUTOR> \
       --name <application_name> \
       --queue <queue_name> \
       --addJars <any_local_files_used_in_SparkContext.addJar> \
@@ -85,10 +84,10 @@ For example:
           --jar examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
           --class org.apache.spark.examples.SparkPi \
           --args yarn-cluster \
-          --num-workers 3 \
-          --master-memory 4g \
-          --worker-memory 2g \
-          --worker-cores 1
+          --num-executors 3 \
+          --driver-memory 4g \
+          --executor-memory 2g \
+          --executor-cores 1
 
 The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running.  Refer to the "Viewing Logs" section below for how to see driver and executor logs.
 
@@ -100,12 +99,12 @@ With yarn-client mode, the application will be launched locally, just like runni
 
 Configuration in yarn-client mode:
 
-In order to tune worker cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
+In order to tune executor cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
 
-* `SPARK_WORKER_INSTANCES`, Number of executors to start (Default: 2)
-* `SPARK_WORKER_CORES`, Number of cores per executor (Default: 1).
-* `SPARK_WORKER_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
-* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
+* `SPARK_EXECUTOR_INSTANCES`, Number of executors to start (Default: 2)
+* `SPARK_EXECUTOR_CORES`, Number of cores per executor (Default: 1).
+* `SPARK_EXECUTOR_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+* `SPARK_DRIVER_MEMORY`, Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
 * `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
 * `SPARK_YARN_QUEUE`, The YARN queue to use for allocation requests (Default: 'default')
 * `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 87785cd..910484e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
 
-  // Default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3))
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
 
   private var registered = false
 
@@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     // Call this to force generation of secret so it gets populated into the
     // hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the worker containers.
+    // doAs in order for the credentials to be passed on to the executor containers.
     val securityMgr = new SecurityManager(sparkConf)
 
     // Start the user's JAR
@@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
 
     // Allocate all containers
-    allocateWorkers()
+    allocateExecutors()
 
     // Wait for the user class to Finish
     userThread.join()
@@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     t
   }
 
-  // this need to happen before allocateWorkers
+  // this need to happen before allocateExecutors
   private def waitForSparkContextInitialized() {
     logInfo("Waiting for spark context initialization")
     try {
@@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
   }
 
-  private def allocateWorkers() {
+  private def allocateExecutors() {
     try {
-      logInfo("Allocating " + args.numWorkers + " workers.")
+      logInfo("Allocating " + args.numExecutors + " executors.")
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
 
       // Exists the loop if the user thread exits.
-      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
-        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
           finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of worker failures reached")
+            "max number of executor failures reached")
         }
         yarnAllocator.allocateContainers(
-          math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+          math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
         ApplicationMaster.incrementAllocatorLoop(1)
         Thread.sleep(100)
       }
@@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
-    logInfo("All workers have launched.")
+    logInfo("All executors have launched.")
 
     // Launch a progress reporter thread, else the app will get killed after expiration
     // (def: 10mins) timeout.
@@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
-          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
             finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of worker failures reached")
+              "max number of executor failures reached")
           }
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
-          if (missingWorkerCount > 0) {
+          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+          if (missingExecutorCount > 0) {
             logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.allocateContainers(missingWorkerCount)
+              format(missingExecutorCount))
+            yarnAllocator.allocateContainers(missingExecutorCount)
           }
           else sendProgress()
           Thread.sleep(sleepTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
new file mode 100644
index 0000000..7b0e020
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote._
+import akka.actor.Terminated
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
+
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+  private val rpc: YarnRPC = YarnRPC.create(conf)
+  private var resourceManager: AMRMProtocol = _
+  private var appAttemptId: ApplicationAttemptId = _
+  private var reporterThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var driverClosed:Boolean = false
+
+  val securityManager = new SecurityManager(sparkConf)
+  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+    conf = sparkConf, securityManager = securityManager)._1
+  var actor: ActorRef = _
+
+  // This actor just working as a monitor to watch on Driver Actor.
+  class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorSelection = _
+
+    override def preStart() {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+      driver ! "Hello"
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        driverClosed = true
+    }
+  }
+
+  def run() {
+
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    appAttemptId = getApplicationAttemptId()
+    resourceManager = registerWithResourceManager()
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+    // Compute number of threads for akka
+    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+    if (minimumMemory > 0) {
+      val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+      if (numCore > 0) {
+        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+        // TODO: Uncomment when hadoop is on a version which has this fixed.
+        // args.workerCores = numCore
+      }
+    }
+
+    waitForSparkMaster()
+
+    // Allocate all containers
+    allocateExecutors()
+
+    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+    reporterThread = launchReporterThread(interval)
+
+    // Wait for the reporter thread to Finish.
+    reporterThread.join()
+
+    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+    actorSystem.shutdown()
+
+    logInfo("Exited")
+    System.exit(0)
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
+    }
+  }
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+
+  private def registerWithResourceManager(): AMRMProtocol = {
+    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+    logInfo("Connecting to ResourceManager at " + rmAddress)
+    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+      .asInstanceOf[RegisterApplicationMasterRequest]
+    appMasterRequest.setApplicationAttemptId(appAttemptId)
+    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+    // Users can then monitor stderr/stdout on that node if required.
+    appMasterRequest.setHost(Utils.localHostName())
+    appMasterRequest.setRpcPort(0)
+    // What do we provide here ? Might make sense to expose something sensible later ?
+    appMasterRequest.setTrackingUrl("")
+    resourceManager.registerApplicationMaster(appMasterRequest)
+  }
+
+  private def waitForSparkMaster() {
+    logInfo("Waiting for spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while(!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Master now available: " + driverHost + ":" + driverPort)
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+        Thread.sleep(100)
+      }
+    }
+    sparkConf.set("spark.driver.host",  driverHost)
+    sparkConf.set("spark.driver.port",  driverPort.toString)
+
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+  }
+
+
+  private def allocateExecutors() {
+
+    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
+
+    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
+      args, preferredNodeLocationData, sparkConf)
+
+    logInfo("Allocating " + args.numExecutors + " executors.")
+    // Wait until all containers have finished
+    // TODO: This is a bit ugly. Can we make it nicer?
+    // TODO: Handle container failure
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+      yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+      Thread.sleep(100)
+    }
+
+    logInfo("All executors have launched.")
+
+  }
+
+  // TODO: We might want to extend this to allocate more containers in case they die !
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (!driverClosed) {
+          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+          if (missingExecutorCount > 0) {
+            logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+            yarnAllocator.allocateContainers(missingExecutorCount)
+          }
+          else sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateContainers(0)
+  }
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+
+    logInfo("finish ApplicationMaster with " + status)
+    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+      .asInstanceOf[FinishApplicationMasterRequest]
+    finishReq.setAppAttemptId(appAttemptId)
+    finishReq.setFinishApplicationStatus(status)
+    resourceManager.finishApplicationMaster(finishReq)
+  }
+
+}
+
+
+object ExecutorLauncher {
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new ExecutorLauncher(args).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
new file mode 100644
index 0000000..981e8b0
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
+
+import org.apache.spark.{SparkConf, Logging}
+
+
+class ExecutorRunnable(
+    container: Container,
+    conf: Configuration,
+    spConf: SparkConf,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    executorMemory: Int,
+    executorCores: Int)
+  extends Runnable with ExecutorRunnableUtil with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var cm: ContainerManager = _
+  val sparkConf = spConf
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Executor Container")
+    cm = connectToCM
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    ctx.setContainerId(container.getId())
+    ctx.setResource(container.getResource())
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    val env = prepareEnvironment
+    ctx.setEnvironment(env)
+
+    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
+    logInfo("Setting up executor with commands: " + commands)
+    ctx.setCommands(commands)
+
+    // Send the start request to the ContainerManager
+    val startReq = Records.newRecord(classOf[StartContainerRequest])
+    .asInstanceOf[StartContainerRequest]
+    startReq.setContainerLaunchContext(ctx)
+    cm.startContainer(startReq)
+  }
+
+  def connectToCM: ContainerManager = {
+    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+
+    // Use doAs and remoteUser here so we can add the container token and not pollute the current
+    // users credentials with all of the individual container tokens
+    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+    val containerToken = container.getContainerToken()
+    if (containerToken != null) {
+      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+    }
+
+    val proxy = user
+        .doAs(new PrivilegedExceptionAction[ContainerManager] {
+          def run: ContainerManager = {
+            rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
+          }
+        })
+    proxy
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index b735d01..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
-  extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private val rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = _
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
-
-  val securityManager = new SecurityManager(sparkConf)
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf, securityManager = securityManager)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-    }
-  }
-
-  def run() {
-
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
-    appAttemptId = getApplicationAttemptId()
-    resourceManager = registerWithResourceManager()
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
-    // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
-    if (minimumMemory > 0) {
-      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-      if (numCore > 0) {
-        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
-      }
-    }
-
-    waitForSparkMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-
-    // must be <= timeoutInterval / 2.
-    val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-    reporterThread = launchReporterThread(interval)
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
-
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
-    resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Master now available: " + driverHost + ":" + driverPort)
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
-
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateWorkers() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
-      args, preferredNodeLocationData, sparkConf)
-
-    logInfo("Allocating " + args.numWorkers + " workers.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-    while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
-      yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
-      Thread.sleep(100)
-    }
-
-    logInfo("All workers have launched.")
-
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed) {
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
-            yarnAllocator.allocateContainers(missingWorkerCount)
-          }
-          else sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-
-    logInfo("finish ApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    resourceManager.finishApplicationMaster(finishReq)
-  }
-
-}
-
-
-object WorkerLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index 8c686e3..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.{SparkConf, Logging}
-
-
-class WorkerRunnable(
-    container: Container,
-    conf: Configuration,
-    spConf: SparkConf,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    workerMemory: Int,
-    workerCores: Int) 
-  extends Runnable with WorkerRunnableUtil with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var cm: ContainerManager = _
-  val sparkConf = spConf
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Worker Container")
-    cm = connectToCM
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    ctx.setContainerId(container.getId())
-    ctx.setResource(container.getResource())
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    val env = prepareEnvironment
-    ctx.setEnvironment(env)
-
-    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
-    logInfo("Setting up worker with commands: " + commands)
-    ctx.setCommands(commands)
-
-    // Send the start request to the ContainerManager
-    val startReq = Records.newRecord(classOf[StartContainerRequest])
-    .asInstanceOf[StartContainerRequest]
-    startReq.setContainerLaunchContext(ctx)
-    cm.startContainer(startReq)
-  }
-
-  def connectToCM: ContainerManager = {
-    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
-    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
-    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
-    // Use doAs and remoteUser here so we can add the container token and not pollute the current
-    // users credentials with all of the individual container tokens
-    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
-    val containerToken = container.getContainerToken()
-    if (containerToken != null) {
-      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
-    }
-
-    val proxy = user
-        .doAs(new PrivilegedExceptionAction[ContainerManager] {
-          def run: ContainerManager = {
-            rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
-          }
-        })
-    proxy
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e91257b..2056667 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val resourceManager: AMRMProtocol, 
     val appAttemptId: ApplicationAttemptId,
-    val maxWorkers: Int,
-    val workerMemory: Int,
-    val workerCores: Int,
+    val maxExecutors: Int,
+    val executorMemory: Int,
+    val executorCores: Int,
     val preferredHostToCount: Map[String, Int], 
     val preferredRackToCount: Map[String, Int],
     val sparkConf: SparkConf)
@@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler(
   // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
 
-  private val numWorkersRunning = new AtomicInteger()
-  // Used to generate a unique id per worker
-  private val workerIdCounter = new AtomicInteger()
+  private val numExecutorsRunning = new AtomicInteger()
+  // Used to generate a unique id per executor
+  private val executorIdCounter = new AtomicInteger()
   private val lastResponseId = new AtomicInteger()
-  private val numWorkersFailed = new AtomicInteger()
+  private val numExecutorsFailed = new AtomicInteger()
 
-  def getNumWorkersRunning: Int = numWorkersRunning.intValue
+  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
-  def getNumWorkersFailed: Int = numWorkersFailed.intValue
+  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
   }
 
-  def allocateContainers(workersToRequest: Int) {
+  def allocateContainers(executorsToRequest: Int) {
     // We need to send the request only once from what I understand ... but for now, not modifying
     // this much.
 
     // Keep polling the Resource Manager for containers
-    val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+    val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
 
     val _allocatedContainers = amResp.getAllocatedContainers()
 
     if (_allocatedContainers.size > 0) {
       logDebug("""
         Allocated containers: %d
-        Current worker count: %d
+        Current executor count: %d
         Containers released: %s
         Containers to be released: %s
         Cluster resources: %s
         """.format(
           _allocatedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers,
           amResp.getAvailableResources))
@@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler(
 
       // Run each of the allocated containers
       for (container <- allocatedContainers) {
-        val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
-        val workerHostname = container.getNodeId.getHost
+        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+        val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
         assert(
-          container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+          container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
 
-        if (numWorkersRunningNow > maxWorkers) {
+        if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, workerHostname))
+            containers for it.""".format(containerId, executorHostname))
           releasedContainerList.add(containerId)
           // reset counter back to old value.
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
         }
         else {
           // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
-          // (workerIdCounter)
-          val workerId = workerIdCounter.incrementAndGet().toString
+          // (executorIdCounter)
+          val executorId = executorIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
             sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
-          logInfo("launching container on " + containerId + " host " + workerHostname)
+          logInfo("launching container on " + containerId + " host " + executorHostname)
           // Just to be safe, simply remove it from pendingReleaseContainers.
           // Should not be there, but ..
           pendingReleaseContainers.remove(containerId)
 
-          val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
           allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
               new HashSet[ContainerId]())
 
             containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, workerHostname)
+            allocatedContainerToHostMap.put(containerId, executorHostname)
             if (rack != null) {
               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
             }
           }
 
           new Thread(
-            new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
-              workerHostname, workerMemory, workerCores)
+            new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
+              executorHostname, executorMemory, executorCores)
           ).start()
         }
       }
       logDebug("""
         Finished processing %d containers.
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           allocatedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler(
         }
         else {
           // Simply decrement count - next iteration of ReporterThread will take care of allocating.
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
             containerId,
             completedContainer.getState,
@@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler(
           // now I think its ok as none of the containers are expected to exit
           if (completedContainer.getExitStatus() != 0) {
             logInfo("Container marked as failed: " + containerId)
-            numWorkersFailed.incrementAndGet()
+            numExecutorsFailed.incrementAndGet()
           }
         }
 
@@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler(
       }
       logDebug("""
         Finished processing %d completed containers.
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           completedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler(
     retval
   }
 
-  private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+  private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
 
     var resourceRequests: List[ResourceRequest] = null
 
       // default.
-    if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
-      logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+    if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+      logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
       resourceRequests = List(
-        createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+        createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
     }
     else {
-      // request for all hosts in preferred nodes and for numWorkers - 
+      // request for all hosts in preferred nodes and for numExecutors - 
       // candidates.size, request by default allocation policy.
       val hostContainerRequests: ArrayBuffer[ResourceRequest] = 
         new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
@@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler(
       val anyContainerRequests: ResourceRequest = createResourceRequest(
         AllocationType.ANY,
         resource = null,
-        numWorkers,
+        numExecutors,
         YarnAllocationHandler.PRIORITY)
 
       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler(
     val releasedContainerList = createReleasedContainerList()
     req.addAllReleases(releasedContainerList)
 
-    if (numWorkers > 0) {
-      logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
-        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+    if (numExecutors > 0) {
+      logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+        executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
     }
     else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
@@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler(
   private def createResourceRequest(
     requestType: AllocationType.AllocationType, 
     resource:String,
-    numWorkers: Int,
+    numExecutors: Int,
     priority: Int): ResourceRequest = {
 
     // If hostname specified, we need atleast two requests - node local and rack local.
@@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler(
       case AllocationType.HOST => {
         assert(YarnAllocationHandler.ANY_HOST != resource)
         val hostname = resource
-        val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+        val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
 
         // Add to host->rack mapping
         YarnAllocationHandler.populateRackInfo(conf, hostname)
@@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler(
       }
       case AllocationType.RACK => {
         val rack = resource
-        createResourceRequestImpl(rack, numWorkers, priority)
+        createResourceRequestImpl(rack, numExecutors, priority)
       }
       case AllocationType.ANY => createResourceRequestImpl(
-        YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+        YarnAllocationHandler.ANY_HOST, numExecutors, priority)
       case _ => throw new IllegalArgumentException(
         "Unexpected/unsupported request type: " + requestType)
     }
@@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler(
 
   private def createResourceRequestImpl(
     hostname:String,
-    numWorkers: Int,
+    numExecutors: Int,
     priority: Int): ResourceRequest = {
 
     val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
     val memCapability = Records.newRecord(classOf[Resource])
     // There probably is some overhead here, let's reserve a bit more memory.
-    memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
     rsrcRequest.setCapability(memCapability)
 
     val pri = Records.newRecord(classOf[Priority])
@@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler(
 
     rsrcRequest.setHostName(hostname)
 
-    rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+    rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
     rsrcRequest
   }
 
@@ -560,9 +560,9 @@ object YarnAllocationHandler {
       conf,
       resourceManager,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       Map[String, Int](),
       Map[String, Int](),
       sparkConf)
@@ -582,9 +582,9 @@ object YarnAllocationHandler {
       conf,
       resourceManager,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       hostToCount,
       rackToCount,
       sparkConf)
@@ -594,9 +594,9 @@ object YarnAllocationHandler {
     conf: Configuration,
     resourceManager: AMRMProtocol,
     appAttemptId: ApplicationAttemptId,
-    maxWorkers: Int,
-    workerMemory: Int,
-    workerCores: Int,
+    maxExecutors: Int,
+    executorMemory: Int,
+    executorCores: Int,
     map: collection.Map[String, collection.Set[SplitInfo]],
     sparkConf: SparkConf): YarnAllocationHandler = {
 
@@ -606,9 +606,9 @@ object YarnAllocationHandler {
       conf,
       resourceManager,
       appAttemptId,
-      maxWorkers,
-      workerMemory,
-      workerCores,
+      maxExecutors,
+      executorMemory,
+      executorCores,
       hostToCount,
       rackToCount,
       sparkConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f76a5dd..25cc901 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userJar: String = null
   var userClass: String = null
   var userArgs: Seq[String] = Seq[String]()
-  var workerMemory = 1024
-  var workerCores = 1
-  var numWorkers = 2
+  var executorMemory = 1024
+  var executorCores = 1
+  var numExecutors = 2
 
   parseArgs(args.toList)
   
@@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
     var args = inputArgs
 
     while (! args.isEmpty) {
-
+      // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+      // the properties with executor in their names are preferred.
       args match {
         case ("--jar") :: value :: tail =>
           userJar = value
@@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) {
           userArgsBuffer += value
           args = tail
 
-        case ("--num-workers") :: IntParam(value) :: tail =>
-          numWorkers = value
+        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+          numExecutors = value
           args = tail
 
-        case ("--worker-memory") :: IntParam(value) :: tail =>
-          workerMemory = value
+        case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail =>
+          executorMemory = value
           args = tail
 
-        case ("--worker-cores") :: IntParam(value) :: tail =>
-          workerCores = value
+        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
+          executorCores = value
           args = tail
 
         case Nil =>
@@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
       "  --class CLASS_NAME   Name of your application's main class (required)\n" +
       "  --args ARGS          Arguments to be passed to your application's main class.\n" +
       "                       Mutliple invocations are possible, each will be passed in order.\n" +
-      "  --num-workers NUM    Number of workers to start (Default: 2)\n" +
-      "  --worker-cores NUM   Number of cores for the workers (Default: 1)\n" +
-      "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+      "  --num-executors NUM    Number of executors to start (Default: 2)\n" +
+      "  --executor-cores NUM   Number of cores for the executors (Default: 1)\n" +
+      "  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
     System.exit(exitCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1f894a6..a001060 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var userJar: String = null
   var userClass: String = null
   var userArgs: Seq[String] = Seq[String]()
-  var workerMemory = 1024 // MB
-  var workerCores = 1
-  var numWorkers = 2
+  var executorMemory = 1024 // MB
+  var executorCores = 1
+  var numExecutors = 2
   var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
@@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
           userArgsBuffer += value
           args = tail
 
-        case ("--master-class") :: value :: tail =>
+        case ("--master-class" | "--am-class") :: value :: tail =>
+          if (args(0) == "--master-class") {
+            println("--master-class is deprecated. Use --am-class instead.")
+          }
           amClass = value
           args = tail
 
-        case ("--master-memory") :: MemoryParam(value) :: tail =>
+        case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
+          if (args(0) == "--master-memory") {
+            println("--master-memory is deprecated. Use --driver-memory instead.")
+          }
           amMemory = value
           args = tail
 
-        case ("--num-workers") :: IntParam(value) :: tail =>
-          numWorkers = value
+        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+          if (args(0) == "--num-workers") {
+            println("--num-workers is deprecated. Use --num-executors instead.")
+          }
+          numExecutors = value
           args = tail
 
-        case ("--worker-memory") :: MemoryParam(value) :: tail =>
-          workerMemory = value
+        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
+          if (args(0) == "--worker-memory") {
+            println("--worker-memory is deprecated. Use --executor-memory instead.")
+          }
+          executorMemory = value
           args = tail
 
-        case ("--worker-cores") :: IntParam(value) :: tail =>
-          workerCores = value
+        case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail =>
+          if (args(0) == "--worker-cores") {
+            println("--worker-cores is deprecated. Use --executor-cores instead.")
+          }
+          executorCores = value
           args = tail
 
         case ("--queue") :: value :: tail =>
@@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
       "  --class CLASS_NAME         Name of your application's main class (required)\n" +
       "  --args ARGS                Arguments to be passed to your application's main class.\n" +
       "                             Mutliple invocations are possible, each will be passed in order.\n" +
-      "  --num-workers NUM          Number of workers to start (Default: 2)\n" +
-      "  --worker-cores NUM         Number of cores for the workers (Default: 1).\n" +
-      "  --master-class CLASS_NAME  Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
-      "  --master-memory MEM        Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
-      "  --worker-memory MEM        Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+      "  --num-executors NUM        Number of executors to start (Default: 2)\n" +
+      "  --executor-cores NUM       Number of cores for the executors (Default: 1).\n" +
+      "  --driver-memory MEM        Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+      "  --executor-memory MEM      Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
       "  --name NAME                The name of your application (Default: Spark)\n" +
       "  --queue QUEUE              The hadoop queue to use for allocation requests (Default: 'default')\n" +
       "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 74c5e0f..57e5761 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -73,10 +73,10 @@ trait ClientBase extends Logging {
       ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
           "Error: You must specify a user jar when running in standalone mode!"),
       (args.userClass == null) -> "Error: You must specify a user class!",
-      (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
+      (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
       (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
         "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
+      (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
         "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
     ).foreach { case(cond, errStr) =>
       if (cond) {
@@ -95,9 +95,9 @@ trait ClientBase extends Logging {
     logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
 
     // If we have requested more then the clusters max for a single resource then exit.
-    if (args.workerMemory > maxMem) {
-      logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
-        format(args.workerMemory, maxMem))
+    if (args.executorMemory > maxMem) {
+      logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
+        format(args.executorMemory, maxMem))
       System.exit(1)
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -276,7 +276,7 @@ trait ClientBase extends Logging {
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
 
-    // Set the environment variables to be passed on to the Workers.
+    // Set the environment variables to be passed on to the executors.
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
@@ -360,9 +360,9 @@ trait ClientBase extends Logging {
         " --class " + args.userClass +
         " --jar " + args.userJar +
         userArgsToString(args) +
-        " --worker-memory " + args.workerMemory +
-        " --worker-cores " + args.workerCores +
-        " --num-workers " + args.numWorkers +
+        " --executor-memory " + args.executorMemory +
+        " --executor-cores " + args.executorCores +
+        " --num-executors " + args.numExecutors +
         " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
         " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 535abbf..68cda0f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Add a resource to the list of distributed cache resources. This list can
-   * be sent to the ApplicationMaster and possibly the workers so that it can 
+   * be sent to the ApplicationMaster and possibly the executors so that it can
    * be downloaded into the Hadoop distributed cache for use by this application.
    * Adds the LocalResource to the localResources HashMap passed in and saves 
-   * the stats of the resources to they can be sent to the workers and verified.
+   * the stats of the resources to they can be sent to the executors and verified.
    *
    * @param fs FileSystem
    * @param conf Configuration

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
new file mode 100644
index 0000000..da0a6f7
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+
+trait ExecutorRunnableUtil extends Logging {
+
+  val yarnConf: YarnConfiguration
+  val sparkConf: SparkConf
+  lazy val env = prepareEnvironment
+
+  def prepareCommand(
+      masterAddress: String,
+      slaveId: String,
+      hostname: String,
+      executorMemory: Int,
+      executorCores: Int) = {
+    // Extra options for the JVM
+    var JAVA_OPTS = ""
+    // Set the JVM memory
+    val executorMemoryString = executorMemory + "m"
+    JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
+    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+    }
+
+    JAVA_OPTS += " -Djava.io.tmpdir=" +
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+    /*
+        else {
+          // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+          // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+          // want to mess with it.
+          // In our expts, using (default) throughput collector has severe perf ramnifications in
+          // multi-tennent machines
+          // The options are based on
+          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+          JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+          JAVA_OPTS += " -XX:+CMSIncrementalMode "
+          JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+          JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+          JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+        }
+    */
+
+    var javaCommand = "java"
+    val javaHome = System.getenv("JAVA_HOME")
+    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand +
+      " -server " +
+      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+      // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
+      // an inconsistent state.
+      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+      " -XX:OnOutOfMemoryError='kill %p' " +
+      JAVA_OPTS +
+      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+      masterAddress + " " +
+      slaveId + " " +
+      hostname + " " +
+      executorCores +
+      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    commands
+  }
+
+  private def setupDistributedCache(
+      file: String,
+      rtype: LocalResourceType,
+      localResources: HashMap[String, LocalResource],
+      timestamp: String,
+      size: String, 
+      vis: String) = {
+    val uri = new URI(file)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(rtype)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+    amJarRsrc.setTimestamp(timestamp.toLong)
+    amJarRsrc.setSize(size.toLong)
+    localResources(uri.getFragment()) = amJarRsrc
+  }
+
+  def prepareLocalResources: HashMap[String, LocalResource] = {
+    logInfo("Preparing Local resources")
+    val localResources = HashMap[String, LocalResource]()
+
+    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+      for( i <- 0 to distFiles.length - 1) {
+        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+          fileSizes(i), visibilities(i))
+      }
+    }
+
+    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+      for( i <- 0 to distArchives.length - 1) {
+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
+          timeStamps(i), fileSizes(i), visibilities(i))
+      }
+    }
+
+    logInfo("Prepared Local resources " + localResources)
+    localResources
+  }
+
+  def prepareEnvironment: HashMap[String, String] = {
+    val env = new HashMap[String, String]()
+
+    ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+
+    // Allow users to specify some environment variables
+    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+    env
+  }
+
+}