You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/05 08:33:16 UTC

[05/14] git commit: Add a "new-yarn" directory in SPARK_HOME, intended to contain Hadoop-2.2 API changes.

Add a "new-yarn" directory in SPARK_HOME, intended to contain Hadoop-2.2 API changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ab8652f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ab8652f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ab8652f2

Branch: refs/heads/master
Commit: ab8652f2d3bf4aa28430867a83cad5ca0f9c1091
Parents: a67ebf4
Author: Harvey Feng <ha...@databricks.com>
Authored: Thu Nov 21 03:58:08 2013 -0800
Committer: Harvey Feng <ha...@databricks.com>
Committed: Sat Nov 23 17:08:30 2013 -0800

----------------------------------------------------------------------
 new-yarn/pom.xml                                | 161 +++++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 477 +++++++++++++
 .../yarn/ApplicationMasterArguments.scala       |  94 +++
 .../org/apache/spark/deploy/yarn/Client.scala   | 497 ++++++++++++++
 .../spark/deploy/yarn/ClientArguments.scala     | 139 ++++
 .../yarn/ClientDistributedCacheManager.scala    | 228 +++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 235 +++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 673 +++++++++++++++++++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  43 ++
 .../cluster/YarnClusterScheduler.scala          |  55 ++
 .../ClientDistributedCacheManagerSuite.scala    | 220 ++++++
 11 files changed, 2822 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml
new file mode 100644
index 0000000..8a065c6
--- /dev/null
+++ b/new-yarn/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>0.9.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-yarn_2.9.3</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project YARN Support</name>
+  <url>http://spark.incubator.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.9.3</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <shadedArtifactAttached>false</shadedArtifactAttached>
+          <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
+          <artifactSet>
+            <includes>
+              <include>*:*</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                  <resource>reference.conf</resource>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <exportAntProperties>true</exportAntProperties>
+              <tasks>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
+                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
+                  <condition>
+                    <not>
+                      <or>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
+                      </or>
+                    </not>
+                  </condition>
+                </fail>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <SPARK_HOME>${basedir}/..</SPARK_HOME>
+            <SPARK_TESTING>1</SPARK_TESTING>
+            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+          </environmentVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..9c43a72
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.Utils
+
+
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  
+  private var rpc: YarnRPC = YarnRPC.create(conf)
+  private var resourceManager: AMRMProtocol = _
+  private var appAttemptId: ApplicationAttemptId = _
+  private var userThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  private val fs = FileSystem.get(yarnConf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var isFinished: Boolean = false
+  private var uiAddress: String = _
+  private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+    YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+  private var isLastAMRetry: Boolean = true
+  // default to numWorkers * 2, with minimum of 3
+  private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+    math.max(args.numWorkers * 2, 3).toString()).toInt
+
+  def run() {
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
+    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+    
+    appAttemptId = getApplicationAttemptId()
+    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+    resourceManager = registerWithResourceManager()
+
+    // Workaround until hadoop moves to something which has
+    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+    // ignore result.
+    // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+    // Hence args.workerCores = numCore disabled above. Any better option?
+
+    // Compute number of threads for akka
+    //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+    //if (minimumMemory > 0) {
+    //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+    //  if (numCore > 0) {
+        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+        // TODO: Uncomment when hadoop is on a version which has this fixed.
+        // args.workerCores = numCore
+    //  }
+    //}
+    // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+    
+    ApplicationMaster.register(this)
+    // Start the user's JAR
+    userThread = startUserClass()
+    
+    // This a bit hacky, but we need to wait until the spark.driver.port property has
+    // been set by the Thread executing the user class.
+    waitForSparkMaster()
+
+    waitForSparkContextInitialized()
+
+    // Do this after spark master is up and SparkContext is created so that we can register UI Url
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+    
+    // Allocate all containers
+    allocateWorkers()
+    
+    // Wait for the user class to Finish     
+    userThread.join()
+
+    System.exit(0)
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+        .getOrElse(""))
+
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    localDirs
+  }
+  
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+  
+  private def registerWithResourceManager(): AMRMProtocol = {
+    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+    logInfo("Connecting to ResourceManager at " + rmAddress)
+    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+  }
+  
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+      .asInstanceOf[RegisterApplicationMasterRequest]
+    appMasterRequest.setApplicationAttemptId(appAttemptId)
+    // Setting this to master host,port - so that the ApplicationReport at client has some
+    // sensible info. 
+    // Users can then monitor stderr/stdout on that node if required.
+    appMasterRequest.setHost(Utils.localHostName())
+    appMasterRequest.setRpcPort(0)
+    appMasterRequest.setTrackingUrl(uiAddress)
+    resourceManager.registerApplicationMaster(appMasterRequest)
+  }
+  
+  private def waitForSparkMaster() {
+    logInfo("Waiting for spark driver to be reachable.")
+    var driverUp = false
+    var tries = 0
+    val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+    while(!driverUp && tries < numTries) {
+      val driverHost = System.getProperty("spark.driver.host")
+      val driverPort = System.getProperty("spark.driver.port")
+      try {
+        val socket = new Socket(driverHost, driverPort.toInt)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception => {
+          logWarning("Failed to connect to driver at %s:%s, retrying ...").
+            format(driverHost, driverPort)
+          Thread.sleep(100)
+          tries = tries + 1
+        }
+      }
+    }
+  }
+
+  private def startUserClass(): Thread  = {
+    logInfo("Starting the user JAR in a separate Thread")
+    val mainMethod = Class.forName(
+      args.userClass,
+      false /* initialize */,
+      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+    val t = new Thread {
+      override def run() {
+        var successed = false
+        try {
+          // Copy
+          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+          mainMethod.invoke(null, mainArgs)
+          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+          // userThread will stop here unless it has uncaught exception thrown out
+          // It need shutdown hook to set SUCCEEDED
+          successed = true
+        } finally {
+          logDebug("finishing main")
+          isLastAMRetry = true
+          if (successed) {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          } else {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+          }
+        }
+      }
+    }
+    t.start()
+    t
+  }
+
+  // this need to happen before allocateWorkers
+  private def waitForSparkContextInitialized() {
+    logInfo("Waiting for spark context initialization")
+    try {
+      var sparkContext: SparkContext = null
+      ApplicationMaster.sparkContextRef.synchronized {
+        var count = 0
+        val waitTime = 10000L
+        val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+          logInfo("Waiting for spark context initialization ... " + count)
+          count = count + 1
+          ApplicationMaster.sparkContextRef.wait(waitTime)
+        }
+        sparkContext = ApplicationMaster.sparkContextRef.get()
+        assert(sparkContext != null || count >= numTries)
+
+        if (null != sparkContext) {
+          uiAddress = sparkContext.ui.appUIAddress
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            resourceManager,
+            appAttemptId,
+            args, 
+            sparkContext.preferredNodeLocationData) 
+        } else {
+          logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+            format(count * waitTime, numTries))
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            resourceManager,
+            appAttemptId,
+            args)
+        }
+      }
+    } finally {
+      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+  }
+
+  private def allocateWorkers() {
+    try {
+      logInfo("Allocating " + args.numWorkers + " workers.")
+      // Wait until all containers have finished
+      // TODO: This is a bit ugly. Can we make it nicer?
+      // TODO: Handle container failure
+
+      // Exists the loop if the user thread exits.
+      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+          finishApplicationMaster(FinalApplicationStatus.FAILED,
+            "max number of worker failures reached")
+        }
+        yarnAllocator.allocateContainers(
+          math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+        ApplicationMaster.incrementAllocatorLoop(1)
+        Thread.sleep(100)
+      }
+    } finally {
+      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+    logInfo("All workers have launched.")
+
+    // Launch a progress reporter thread, else the app will get killed after expiration
+    // (def: 10mins) timeout.
+    // TODO(harvey): Verify the timeout
+    if (userThread.isAlive) {
+      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+      // we want to be reasonably responsive without causing too many requests to RM.
+      val schedulerInterval =
+        System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+      // must be <= timeoutInterval / 2.
+      val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+      launchReporterThread(interval)
+    }
+  }
+
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (userThread.isAlive) {
+          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+            finishApplicationMaster(FinalApplicationStatus.FAILED,
+              "max number of worker failures reached")
+          }
+          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+          if (missingWorkerCount > 0) {
+            logInfo("Allocating %d containers to make up for (potentially) lost containers".
+              format(missingWorkerCount))
+            yarnAllocator.allocateContainers(missingWorkerCount)
+          }
+          else sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // Setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // Simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateContainers(0)
+  }
+
+  /*
+  def printContainers(containers: List[Container]) = {
+    for (container <- containers) {
+      logInfo("Launching shell command on a new container."
+        + ", containerId=" + container.getId()
+        + ", containerNode=" + container.getNodeId().getHost() 
+        + ":" + container.getNodeId().getPort()
+        + ", containerNodeURI=" + container.getNodeHttpAddress()
+        + ", containerState" + container.getState()
+        + ", containerResourceMemory"  
+        + container.getResource().getMemory())
+    }
+  }
+  */
+
+  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+    synchronized {
+      if (isFinished) {
+        return
+      }
+      isFinished = true
+    }
+
+    logInfo("finishApplicationMaster with " + status)
+    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+      .asInstanceOf[FinishApplicationMasterRequest]
+    finishReq.setAppAttemptId(appAttemptId)
+    finishReq.setFinishApplicationStatus(status)
+    finishReq.setDiagnostics(diagnostics)
+    // Set tracking url to empty since we don't have a history server.
+    finishReq.setTrackingUrl("")
+    resourceManager.finishApplicationMaster(finishReq)
+  }
+
+  /**
+   * Clean up the staging directory. 
+   */
+  private def cleanupStagingDir() { 
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  // The shutdown hook that runs when a signal is received AND during normal close of the JVM. 
+  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+    def run() {
+      logInfo("AppMaster received a signal.")
+      // we need to clean up staging dir before HDFS is shut down
+      // make sure we don't delete it until this is the last AM
+      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+    }
+  }
+}
+
+object ApplicationMaster {
+  // Number of times to wait for the allocator loop to complete.
+  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+  // This is to ensure that we have reasonable number of containers before we start
+  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+  // optimal as more containers are available. Might need to handle this better.
+  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+  def incrementAllocatorLoop(by: Int) {
+    val count = yarnAllocatorLoop.getAndAdd(by)
+    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+      yarnAllocatorLoop.synchronized {
+        // to wake threads off wait ...
+        yarnAllocatorLoop.notifyAll()
+      }
+    }
+  }
+
+  private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+
+  def register(master: ApplicationMaster) {
+    applicationMasters.add(master)
+  }
+
+  val sparkContextRef: AtomicReference[SparkContext] =
+    new AtomicReference[SparkContext](null /* initialValue */)
+  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+
+  def sparkContextInitialized(sc: SparkContext): Boolean = {
+    var modified = false
+    sparkContextRef.synchronized {
+      modified = sparkContextRef.compareAndSet(null, sc)
+      sparkContextRef.notifyAll()
+    }
+
+    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+    // System.exit.
+    // Should not really have to do this, but it helps YARN to evict resources earlier.
+    // Not to mention, prevent the Client from declaring failure even though we exited properly.
+    // Note that this will unfortunately not properly clean up the staging files because it gets
+    // called too late, after the filesystem is already shutdown.
+    if (modified) {
+      Runtime.getRuntime().addShutdownHook(new Thread with Logging { 
+        // This is not only logs, but also ensures that log system is initialized for this instance
+        // when we are actually 'run'-ing.
+        logInfo("Adding shutdown hook for context " + sc)
+        override def run() { 
+          logInfo("Invoking sc stop from shutdown hook") 
+          sc.stop() 
+          // Best case ...
+          for (master <- applicationMasters) {
+            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          }
+        } 
+      } )
+    }
+
+    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+    yarnAllocatorLoop.synchronized {
+      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+        yarnAllocatorLoop.wait(1000L)
+      }
+    }
+    modified
+  }
+
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new ApplicationMaster(args).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
new file mode 100644
index 0000000..f76a5dd
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.util.IntParam
+import collection.mutable.ArrayBuffer
+
+class ApplicationMasterArguments(val args: Array[String]) {
+  var userJar: String = null
+  var userClass: String = null
+  var userArgs: Seq[String] = Seq[String]()
+  var workerMemory = 1024
+  var workerCores = 1
+  var numWorkers = 2
+
+  parseArgs(args.toList)
+  
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    val userArgsBuffer = new ArrayBuffer[String]()
+
+    var args = inputArgs
+
+    while (! args.isEmpty) {
+
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--args") :: value :: tail =>
+          userArgsBuffer += value
+          args = tail
+
+        case ("--num-workers") :: IntParam(value) :: tail =>
+          numWorkers = value
+          args = tail
+
+        case ("--worker-memory") :: IntParam(value) :: tail =>
+          workerMemory = value
+          args = tail
+
+        case ("--worker-cores") :: IntParam(value) :: tail =>
+          workerCores = value
+          args = tail
+
+        case Nil =>
+          if (userJar == null || userClass == null) {
+            printUsageAndExit(1)
+          }
+
+        case _ =>
+          printUsageAndExit(1, args)
+      }
+    }
+
+    userArgs = userArgsBuffer.readOnly
+  }
+  
+  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    if (unknownParam != null) {
+      System.err.println("Unknown/unsupported param " + unknownParam)
+    }
+    System.err.println(
+      "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
+      "Options:\n" +
+      "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
+      "  --class CLASS_NAME   Name of your application's main class (required)\n" +
+      "  --args ARGS          Arguments to be passed to your application's main class.\n" +
+      "                       Mutliple invocations are possible, each will be passed in order.\n" +
+      "  --num-workers NUM    Number of workers to start (Default: 2)\n" +
+      "  --worker-cores NUM   Number of cores for the workers (Default: 1)\n" +
+      "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
new file mode 100644
index 0000000..86310f3
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.{InetAddress, UnknownHostException, URI}
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.YarnClientImpl
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, Records}
+
+import org.apache.spark.Logging 
+import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
+
+
+class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+
+  def this(args: ClientArguments) = this(new Configuration(), args)
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+  private val SPARK_STAGING: String = ".sparkStaging"
+  private val distCacheMgr = new ClientDistributedCacheManager()
+
+  // Staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) 
+
+  def run() {
+    validateArgs()
+
+    init(yarnConf)
+    start()
+    logClusterResourceDetails()
+
+    val newApp = super.getNewApplication()
+    val appId = newApp.getApplicationId()
+
+    verifyClusterResources(newApp)
+    val appContext = createApplicationSubmissionContext(appId)
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val env = setupLaunchEnv(localResources, appStagingDir)
+    val amContainer = createContainerLaunchContext(newApp, localResources, env)
+
+    appContext.setQueue(args.amQueue)
+    appContext.setAMContainerSpec(amContainer)
+    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+    submitApp(appContext)
+
+    monitorApplication(appId)
+    System.exit(0)
+  }
+
+  def validateArgs() = {
+    Map(
+      (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+      (args.userJar == null) -> "Error: You must specify a user jar!",
+      (args.userClass == null) -> "Error: You must specify a user class!",
+      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> "Error: AM memory size must be +
+        greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD,
+      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> "Error: Worker memory size +
+        must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString
+    .foreach { case(cond, errStr) => 
+      if (cond) {
+        logError(errStr)
+        args.printUsageAndExit(1)
+      }
+    }
+  }
+
+  def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
+
+  def logClusterResourceDetails() {
+    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+    logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+      clusterMetrics.getNumNodeManagers)
+
+    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
+    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+      queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+        queueInfo.getQueueName,
+        queueInfo.getCurrentCapacity,
+        queueInfo.getMaximumCapacity,
+        queueInfo.getApplications.size,
+        queueInfo.getChildQueues.size)
+  }
+
+  def verifyClusterResources(app: GetNewApplicationResponse) = { 
+    val maxMem = app.getMaximumResourceCapability().getMemory()
+    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
+
+    // If we have requested more then the clusters max for a single resource then exit.
+    if (args.workerMemory > maxMem) {
+      logError("the worker size is to large to run on this cluster " + args.workerMemory)
+      System.exit(1)
+    }
+    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    if (amMem > maxMem) {
+      logError("AM size is to large to run on this cluster "  + amMem)
+      System.exit(1)
+    }
+
+    // We could add checks to make sure the entire cluster has enough resources but that involves
+    // getting all the node reports and computing ourselves 
+  }
+
+  def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
+    logInfo("Setting up application submission context for ASM")
+    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+    appContext.setApplicationId(appId)
+    appContext.setApplicationName(args.appName)
+    return appContext
+  }
+
+  /** See if two file systems are the same or not. */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null) {
+      return false
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false
+    }
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+      } catch {
+        case e: UnknownHostException =>
+          return false
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false
+    } else if (srcHost != null && dstHost == null) {
+      return false
+    }
+    //check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false
+    }
+    return true
+  }
+
+  /** Copy the file into HDFS if needed. */
+  private def copyRemoteFile(
+      dstDir: Path,
+      originalPath: Path,
+      replication: Short,
+      setPerms: Boolean = false): Path = {
+    val fs = FileSystem.get(conf)
+    val remoteFs = originalPath.getFileSystem(conf)
+    var newPath = originalPath
+    if (! compareFs(remoteFs, fs)) {
+      newPath = new Path(dstDir, originalPath.getName())
+      logInfo("Uploading " + originalPath + " to " + newPath)
+      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+      fs.setReplication(newPath, replication)
+      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+    } 
+    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+    // version shows the specific version in the distributed cache configuration
+    val qualPath = fs.makeQualified(newPath)
+    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+    val destPath = fc.resolvePath(qualPath)
+    destPath
+  }
+
+  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+    logInfo("Preparing Local resources")
+    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+    // local resources to the AM.
+    val fs = FileSystem.get(conf)
+
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+        logError("Can't get Master Kerberos principal for use as renewer")
+        System.exit(1)
+      }
+    }
+    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+    val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val dstFs = dst.getFileSystem(conf)
+      dstFs.addDelegationTokens(delegTokenRenewer, credentials)
+    }
+    val localResources = HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+    Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, 
+      Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
+    .foreach { case(destName, _localPath) =>
+      val localPath: String = if (_localPath != null) _localPath.trim() else ""
+      if (! localPath.isEmpty()) {
+        var localURI = new URI(localPath)
+        // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+        if (localURI.getScheme() == null) {
+          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
+        }
+        val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+        val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          destName, statCache)
+      }
+    }
+
+    // handle any add jars
+    if ((args.addJars != null) && (!args.addJars.isEmpty())){
+      args.addJars.split(',').foreach { case file: String =>
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache, true)
+      }
+    }
+
+    // handle any distributed cache files
+    if ((args.files != null) && (!args.files.isEmpty())){
+      args.files.split(',').foreach { case file: String =>
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache)
+      }
+    }
+
+    // handle any distributed cache archives
+    if ((args.archives != null) && (!args.archives.isEmpty())) {
+      args.archives.split(',').foreach { case file:String =>
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, 
+          linkname, statCache)
+      }
+    }
+
+    UserGroupInformation.getCurrentUser().addCredentials(credentials)
+    return localResources
+  }
+
+  def setupLaunchEnv(
+      localResources: HashMap[String, LocalResource], 
+      stagingDir: String): HashMap[String, String] = {
+    logInfo("Setting up the launch environment")
+    val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
+
+    val env = new HashMap[String, String]()
+
+    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+    env("SPARK_YARN_MODE") = "true"
+    env("SPARK_YARN_STAGING_DIR") = stagingDir
+
+    // Set the environment variables to be passed on to the Workers.
+    distCacheMgr.setDistFilesEnv(env)
+    distCacheMgr.setDistArchivesEnv(env)
+
+    // Allow users to specify some environment variables.
+    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+    // Add each SPARK-* key to the environment.
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+    env
+  }
+
+  def userArgsToString(clientArgs: ClientArguments): String = {
+    val prefix = " --args "
+    val args = clientArgs.userArgs
+    val retval = new StringBuilder()
+    for (arg <- args){
+      retval.append(prefix).append(" '").append(arg).append("' ")
+    }
+    retval.toString
+  }
+
+  def createContainerLaunchContext(
+      newApp: GetNewApplicationResponse,
+      localResources: HashMap[String, LocalResource],
+      env: HashMap[String, String]): ContainerLaunchContext = {
+    logInfo("Setting up container launch context")
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources)
+    amContainer.setEnvironment(env)
+
+    val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+
+    // TODO(harvey): This can probably be a val.
+    var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
+      ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+        YarnAllocationHandler.MEMORY_OVERHEAD)
+
+    // Extra options for the JVM
+    var JAVA_OPTS = ""
+
+    // Add Xmx for am memory
+    JAVA_OPTS += "-Xmx" + amMemory + "m "
+
+    JAVA_OPTS += " -Djava.io.tmpdir=" + 
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out. The context is, default gc for server class machines
+    // end up using all cores to do gc - hence if there are multiple containers in same node,
+    // spark gc effects all other containers performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+    val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+    if (useConcurrentAndIncrementalGC) {
+      // In our expts, using (default) throughput collector has severe perf ramnifications in
+      // multi-tenant machines
+      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+      JAVA_OPTS += " -XX:+CMSIncrementalMode "
+      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+    }
+
+    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+    }
+
+    // Command for the ApplicationMaster
+    var javaCommand = "java"
+    val javaHome = System.getenv("JAVA_HOME")
+    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand + 
+      " -server " +
+      JAVA_OPTS +
+      " org.apache.spark.deploy.yarn.ApplicationMaster" +
+      " --class " + args.userClass + 
+      " --jar " + args.userJar +
+      userArgsToString(args) +
+      " --worker-memory " + args.workerMemory +
+      " --worker-cores " + args.workerCores +
+      " --num-workers " + args.numWorkers +
+      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+    logInfo("Command for the ApplicationMaster: " + commands(0))
+    amContainer.setCommands(commands)
+
+    val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
+    // Memory for the ApplicationMaster.
+    capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    amContainer.setResource(capability)
+
+    // Setup security tokens.
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+    amContainer
+  }
+
+  def submitApp(appContext: ApplicationSubmissionContext) = {
+    // Submit the application to the applications manager.
+    logInfo("Submitting application to ASM")
+    super.submitApplication(appContext)
+  }
+
+  def monitorApplication(appId: ApplicationId): Boolean = {  
+    while (true) {
+      Thread.sleep(1000)
+      val report = super.getApplicationReport(appId)
+
+      logInfo("Application report from ASM: \n" +
+        "\t application identifier: " + appId.toString() + "\n" +
+        "\t appId: " + appId.getId() + "\n" +
+        "\t clientToken: " + report.getClientToken() + "\n" +
+        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
+        "\t appMasterHost: " + report.getHost() + "\n" +
+        "\t appQueue: " + report.getQueue() + "\n" +
+        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+        "\t appStartTime: " + report.getStartTime() + "\n" +
+        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
+        "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
+        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
+        "\t appUser: " + report.getUser()
+      )
+
+      val state = report.getYarnApplicationState()
+      val dsStatus = report.getFinalApplicationStatus()
+      if (state == YarnApplicationState.FINISHED || 
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        return true
+      }
+    }
+    true
+  }
+}
+
+object Client {
+  val SPARK_JAR: String = "spark.jar"
+  val APP_JAR: String = "app.jar"
+  val LOG4J_PROP: String = "log4j.properties"
+
+  def main(argStrings: Array[String]) {
+    // Set an env variable indicating we are running in YARN mode.
+    // Note that anything with SPARK prefix gets propagated to all (remote) processes
+    System.setProperty("SPARK_YARN_MODE", "true")
+
+    val args = new ClientArguments(argStrings)
+
+    new Client(args).run
+  }
+
+  // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
+  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
+    for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+    }
+  }
+
+  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+    // If log4j present, ensure ours overrides all others
+    if (addLog4j) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + LOG4J_PROP)
+    }
+    // Normally the users app.jar is last in case conflicts with spark jars
+    val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+      .toBoolean
+    if (userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + APP_JAR)
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + SPARK_JAR)
+    Client.populateHadoopClasspath(conf, env)
+
+    if (!userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + APP_JAR)
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + "*")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
new file mode 100644
index 0000000..852dbd7
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.IntParam
+import collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
+
+// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
+class ClientArguments(val args: Array[String]) {
+  var addJars: String = null
+  var files: String = null
+  var archives: String = null
+  var userJar: String = null
+  var userClass: String = null
+  var userArgs: Seq[String] = Seq[String]()
+  var workerMemory = 1024
+  var workerCores = 1
+  var numWorkers = 2
+  var amQueue = System.getProperty("QUEUE", "default")
+  var amMemory: Int = 512
+  var appName: String = "Spark"
+  // TODO
+  var inputFormatInfo: List[InputFormatInfo] = null
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
+    val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
+
+    var args = inputArgs
+
+    while (! args.isEmpty) {
+
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--args") :: value :: tail =>
+          userArgsBuffer += value
+          args = tail
+
+        case ("--master-memory") :: MemoryParam(value) :: tail =>
+          amMemory = value
+          args = tail
+
+        case ("--num-workers") :: IntParam(value) :: tail =>
+          numWorkers = value
+          args = tail
+
+        case ("--worker-memory") :: MemoryParam(value) :: tail =>
+          workerMemory = value
+          args = tail
+
+        case ("--worker-cores") :: IntParam(value) :: tail =>
+          workerCores = value
+          args = tail
+
+        case ("--queue") :: value :: tail =>
+          amQueue = value
+          args = tail
+
+        case ("--name") :: value :: tail =>
+          appName = value
+
+        case ("--addJars") :: value :: tail =>
+          addJars = value
+          args = tail
+
+        case ("--files") :: value :: tail =>
+          files = value
+          args = tail
+
+        case ("--archives") :: value :: tail =>
+          archives = value
+          args = tail
+
+        case Nil =>
+          if (userJar == null || userClass == null) {
+            printUsageAndExit(1)
+          }
+
+        case _ =>
+          printUsageAndExit(1, args)
+      }
+    }
+
+    userArgs = userArgsBuffer.readOnly
+    inputFormatInfo = inputFormatMap.values.toList
+  }
+
+
+  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    if (unknownParam != null) {
+      System.err.println("Unknown/unsupported param " + unknownParam)
+    }
+    System.err.println(
+      "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
+      "Options:\n" +
+      "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
+      "  --class CLASS_NAME   Name of your application's main class (required)\n" +
+      "  --args ARGS          Arguments to be passed to your application's main class.\n" +
+      "                       Mutliple invocations are possible, each will be passed in order.\n" +
+      "  --num-workers NUM    Number of workers to start (Default: 2)\n" +
+      "  --worker-cores NUM   Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+      "  --master-memory MEM  Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+      "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+      "  --name NAME          The name of your application (Default: Spark)\n" +
+      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')\n" +
+      "  --addJars jars       Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+      "  --files files        Comma separated list of files to be distributed with the job.\n" +
+      "  --archives archives  Comma separated list of archives to be distributed with the job."
+      )
+    System.exit(exitCode)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000..5f159b0
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import org.apache.spark.Logging 
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable.Map
+
+
+/** Client side methods to setup the Hadoop distributed cache */
+class ClientDistributedCacheManager() extends Logging {
+  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+
+
+  /**
+   * Add a resource to the list of distributed cache resources. This list can
+   * be sent to the ApplicationMaster and possibly the workers so that it can 
+   * be downloaded into the Hadoop distributed cache for use by this application.
+   * Adds the LocalResource to the localResources HashMap passed in and saves 
+   * the stats of the resources to they can be sent to the workers and verified.
+   *
+   * @param fs FileSystem
+   * @param conf Configuration
+   * @param destPath path to the resource
+   * @param localResources localResource hashMap to insert the resource into
+   * @param resourceType LocalResourceType 
+   * @param link link presented in the distributed cache to the destination
+   * @param statCache cache to store the file/directory stats 
+   * @param appMasterOnly Whether to only add the resource to the app master
+   */
+  def addResource(
+      fs: FileSystem,
+      conf: Configuration,
+      destPath: Path, 
+      localResources: HashMap[String, LocalResource],
+      resourceType: LocalResourceType,
+      link: String,
+      statCache: Map[URI, FileStatus],
+      appMasterOnly: Boolean = false) = {
+    val destStatus = fs.getFileStatus(destPath)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(resourceType)
+    val visibility = getVisibility(conf, destPath.toUri(), statCache)
+    amJarRsrc.setVisibility(visibility)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+    localResources(link) = amJarRsrc
+    
+    if (appMasterOnly == false) {
+      val uri = destPath.toUri()
+      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
+      if (resourceType == LocalResourceType.FILE) {
+        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      } else {
+        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      }
+    }
+  }
+
+  /**
+   * Adds the necessary cache file env variables to the env passed in
+   * @param env
+   */
+  def setDistFilesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheFiles.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Adds the necessary cache archive env variables to the env passed in
+   * @param env
+   */
+  def setDistArchivesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheArchives.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Returns the local resource visibility depending on the cache file permissions
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return LocalResourceVisibility
+   */
+  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+      LocalResourceVisibility = {
+    if (isPublic(conf, uri, statCache)) {
+      return LocalResourceVisibility.PUBLIC 
+    } 
+    return LocalResourceVisibility.PRIVATE
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+    val fs = FileSystem.get(uri, conf)
+    val current = new Path(uri.getPath())
+    //the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+      return false
+    }
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory heirarchy to the given path)
+   * @param fs
+   * @param path
+   * @param statCache
+   * @return true if all ancestors have the 'execute' permission set for all users
+   */
+  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
+      statCache: Map[URI, FileStatus]): Boolean =  {
+    var current = path
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+        return false
+      }
+      current = current.getParent()
+    }
+    return true
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def checkPermissionOfOther(fs: FileSystem, path: Path,
+      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+    val status = getFileStatus(fs, path.toUri(), statCache)
+    val perms = status.getPermission()
+    val otherAction = perms.getOtherAction()
+    if (otherAction.implies(action)) {
+      return true
+    }
+    return false
+  }
+
+  /**
+   * Checks to see if the given uri exists in the cache, if it does it 
+   * returns the existing FileStatus, otherwise it stats the uri, stores
+   * it in the cache, and returns the FileStatus.
+   * @param fs
+   * @param uri
+   * @param statCache
+   * @return FileStatus
+   */
+  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+    val stat = statCache.get(uri) match {
+      case Some(existstat) => existstat
+      case None => 
+        val newStat = fs.getFileStatus(new Path(uri))
+        statCache.put(uri, newStat)
+        newStat
+    }
+    return stat
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab8652f2/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
new file mode 100644
index 0000000..6a90cc5
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
+
+import org.apache.spark.Logging
+
+
+class WorkerRunnable(
+    container: Container,
+    conf: Configuration,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    workerMemory: Int,
+    workerCores: Int) 
+  extends Runnable with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var cm: ContainerManager = null
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Worker Container")
+    cm = connectToCM
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    ctx.setContainerId(container.getId())
+    ctx.setResource(container.getResource())
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    val env = prepareEnvironment
+    ctx.setEnvironment(env)
+
+    // Extra options for the JVM
+    var JAVA_OPTS = ""
+    // Set the JVM memory
+    val workerMemoryString = workerMemory + "m"
+    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+    }
+
+    JAVA_OPTS += " -Djava.io.tmpdir=" + 
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+/*
+    else {
+      // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+      // want to mess with it.
+      // In our expts, using (default) throughput collector has severe perf ramnifications in
+      // multi-tennent machines
+      // The options are based on
+      // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+      JAVA_OPTS += " -XX:+CMSIncrementalMode "
+      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+    }
+*/
+
+    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+    var javaCommand = "java"
+    val javaHome = System.getenv("JAVA_HOME")
+    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand +
+      " -server " +
+      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+      // an inconsistent state.
+      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+      " -XX:OnOutOfMemoryError='kill %p' " +
+      JAVA_OPTS +
+      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+      masterAddress + " " +
+      slaveId + " " +
+      hostname + " " +
+      workerCores +
+      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+    logInfo("Setting up worker with commands: " + commands)
+    ctx.setCommands(commands)
+
+    // Send the start request to the ContainerManager
+    val startReq = Records.newRecord(classOf[StartContainerRequest])
+    .asInstanceOf[StartContainerRequest]
+    startReq.setContainerLaunchContext(ctx)
+    cm.startContainer(startReq)
+  }
+
+  private def setupDistributedCache(
+      file: String,
+      rtype: LocalResourceType,
+      localResources: HashMap[String, LocalResource],
+      timestamp: String,
+      size: String, 
+      vis: String) = {
+    val uri = new URI(file)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(rtype)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+    amJarRsrc.setTimestamp(timestamp.toLong)
+    amJarRsrc.setSize(size.toLong)
+    localResources(uri.getFragment()) = amJarRsrc
+  }
+
+  def prepareLocalResources: HashMap[String, LocalResource] = {
+    logInfo("Preparing Local resources")
+    val localResources = HashMap[String, LocalResource]()
+
+    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+      for( i <- 0 to distFiles.length - 1) {
+        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+          fileSizes(i), visibilities(i))
+      }
+    }
+
+    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+      for( i <- 0 to distArchives.length - 1) {
+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
+          timeStamps(i), fileSizes(i), visibilities(i))
+      }
+    }
+
+    logInfo("Prepared Local resources " + localResources)
+    return localResources
+  }
+
+  def prepareEnvironment: HashMap[String, String] = {
+    val env = new HashMap[String, String]()
+
+    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+
+    // Allow users to specify some environment variables
+    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+    return env
+  }
+
+  def connectToCM: ContainerManager = {
+    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+
+    // Use doAs and remoteUser here so we can add the container token and not pollute the current
+    // users credentials with all of the individual container tokens
+    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+    val containerToken = container.getContainerToken()
+    if (containerToken != null) {
+      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+    }
+
+    val proxy = user
+        .doAs(new PrivilegedExceptionAction[ContainerManager] {
+          def run: ContainerManager = {
+            return rpc.getProxy(classOf[ContainerManager],
+                cmAddress, conf).asInstanceOf[ContainerManager]
+          }
+        })
+    proxy
+  }
+
+}