You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 08:16:41 UTC

[29/32] git commit: Using name yarn-alpha/yarn instead of yarn-2.0/yarn-2.2

Using name yarn-alpha/yarn instead of yarn-2.0/yarn-2.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ebdfa6bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ebdfa6bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ebdfa6bb

Branch: refs/heads/master
Commit: ebdfa6bb9766209bc5a3c4241fa47141c5e9c5cb
Parents: a47ebf7
Author: Raymond Liu <ra...@intel.com>
Authored: Thu Jan 2 11:23:33 2014 +0800
Committer: Raymond Liu <ra...@intel.com>
Committed: Fri Jan 3 12:14:38 2014 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   8 +-
 docs/building-with-maven.md                     |   8 +-
 pom.xml                                         |   4 +-
 project/SparkBuild.scala                        |  16 +-
 yarn/2.0/pom.xml                                | 112 ---
 .../spark/deploy/yarn/ApplicationMaster.scala   | 458 ------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 505 --------------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 248 -------
 .../spark/deploy/yarn/WorkerRunnable.scala      | 235 -------
 .../deploy/yarn/YarnAllocationHandler.scala     | 680 ------------------
 yarn/2.2/pom.xml                                | 112 ---
 .../spark/deploy/yarn/ApplicationMaster.scala   | 428 ------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 523 --------------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 227 ------
 .../spark/deploy/yarn/WorkerRunnable.scala      | 209 ------
 .../deploy/yarn/YarnAllocationHandler.scala     | 694 -------------------
 yarn/README.md                                  |   8 +-
 yarn/alpha/pom.xml                              | 112 +++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 458 ++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 505 ++++++++++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 248 +++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 235 +++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 680 ++++++++++++++++++
 yarn/pom.xml                                    |   8 +-
 yarn/stable/pom.xml                             | 112 +++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 428 ++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 523 ++++++++++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 227 ++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 209 ++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 694 +++++++++++++++++++
 30 files changed, 4457 insertions(+), 4457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index bd94834..079509b 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -124,21 +124,21 @@
 
   <profiles>
     <profile>
-      <id>hadoop2-yarn</id>
+      <id>yarn-alpha</id>
       <dependencies>
         <dependency>
           <groupId>org.apache.spark</groupId>
-          <artifactId>spark-yarn-2.0_${scala.binary.version}</artifactId>
+          <artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
           <version>${project.version}</version>
         </dependency>
       </dependencies>
     </profile>
     <profile>
-      <id>hadoop2.2-yarn</id>
+      <id>yarn</id>
       <dependencies>
         <dependency>
           <groupId>org.apache.spark</groupId>
-          <artifactId>spark-yarn-2.2_${scala.binary.version}</artifactId>
+          <artifactId>spark-yarn_${scala.binary.version}</artifactId>
           <version>${project.version}</version>
         </dependency>
       </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/docs/building-with-maven.md
----------------------------------------------------------------------
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index 699b10b..b9ff0af 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -37,16 +37,16 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit
     # Cloudera CDH 4.2.0 with MapReduce v1
     $ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
 
-For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" or "hadoop2.2-yarn" profile and set the "hadoop.version", "yarn.version" property:
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "yarn-alpha" or "yarn" profile and set the "hadoop.version", "yarn.version" property:
 
     # Apache Hadoop 2.0.5-alpha
-    $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
+    $ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
 
     # Cloudera CDH 4.2.0 with MapReduce v2
-    $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
+    $ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
 
     # Apache Hadoop 2.2.X ( e.g. 2.2.0 as below ) and newer
-    $ mvn -Phadoop2.2-yarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package
+    $ mvn -Pyarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package
 
 ## Spark Tests in Maven ##
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2c1f6d..aa2f076 100644
--- a/pom.xml
+++ b/pom.xml
@@ -722,7 +722,7 @@
 
   <profiles>
     <profile>
-      <id>hadoop2-yarn</id>
+      <id>yarn-alpha</id>
       <properties>
         <hadoop.major.version>2</hadoop.major.version>
         <!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
@@ -738,7 +738,7 @@
     </profile>
 
     <profile>
-      <id>hadoop2.2-yarn</id>
+      <id>yarn</id>
       <properties>
         <hadoop.major.version>2</hadoop.major.version>
         <hadoop.version>2.2.0</hadoop.version>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index bc9c02d..b0749cc 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -85,11 +85,11 @@ object SparkBuild extends Build {
   }
 
   // Conditionally include the yarn sub-project
-  lazy val yarn20 = Project("yarn2-alpha", file("yarn/2.0"), settings = yarn20Settings) dependsOn(core)
-  lazy val yarn22 = Project("yarn2-stable", file("yarn/2.2"), settings = yarn22Settings) dependsOn(core)
+  lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
+  lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
 
-  lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn22 else yarn20) else Seq[ClasspathDependency]()
-  lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn22 else yarn20) else Seq[ProjectReference]()
+  lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
+  lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
 
   // Everything except assembly, tools and examples belong to packageProjects
   lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
@@ -334,12 +334,12 @@ object SparkBuild extends Build {
 
   ) ++ extraYarnSettings
 
-  def yarn20Settings = yarnCommonSettings ++ Seq(
-    name := "spark-yarn-2.0"
+  def yarnAlphaSettings = yarnCommonSettings ++ Seq(
+    name := "spark-yarn-alpha"
   )
 
-  def yarn22Settings = yarnCommonSettings ++ Seq(
-    name := "spark-yarn-2.2"
+  def yarnSettings = yarnCommonSettings ++ Seq(
+    name := "spark-yarn"
   )
 
   // Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/2.0/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/2.0/pom.xml b/yarn/2.0/pom.xml
deleted file mode 100644
index ec6738f..0000000
--- a/yarn/2.0/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-yarn-aggregator_2.10</artifactId>
-    <version>0.9.0-incubating-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-yarn-2.0_2.10</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project YARN 2.0</name>
-  <url>http://spark.incubator.apache.org/</url>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <executions>
-          <execution>
-           <id>add-scala-sources</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>src/main/scala</source>
-                <source>../common/src/main/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-          <execution>
-            <id>add-scala-test-sources</id>
-            <phase>generate-test-sources</phase>
-            <goals>
-              <goal>add-test-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>src/test/scala</source>
-                <source>../common/src/test/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>test</phase>
-            <goals>
-              <goal>run</goal>
-            </goals>
-            <configuration>
-              <exportAntProperties>true</exportAntProperties>
-              <tasks>
-                <property name="spark.classpath" refid="maven.test.classpath" />
-                <property environment="env" />
-                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
-                  <condition>
-                    <not>
-                      <or>
-                        <isset property="env.SCALA_HOME" />
-                        <isset property="env.SCALA_LIBRARY_PATH" />
-                      </or>
-                    </not>
-                  </condition>
-                </fail>
-              </tasks>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <configuration>
-          <environmentVariables>
-            <SPARK_HOME>${basedir}/../..</SPARK_HOME>
-            <SPARK_TESTING>1</SPARK_TESTING>
-            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
-          </environmentVariables>
-        </configuration>
-      </plugin>
-    </plugins>
-
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 7cf120d..0000000
--- a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, SparkContext, Logging}
-import org.apache.spark.util.Utils
-
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
-  private var rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = _
-  private var appAttemptId: ApplicationAttemptId = _
-  private var userThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  private val fs = FileSystem.get(yarnConf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var isFinished: Boolean = false
-  private var uiAddress: String = _
-  private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
-    YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
-  private var isLastAMRetry: Boolean = true
-
-  private val sparkConf = new SparkConf()
-  // Default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3))
-
-  def run() {
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
-    // set the web ui port to be ephemeral for yarn so we don't conflict with
-    // other spark processes running on the same box
-    System.setProperty("spark.ui.port", "0")
-
-    // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
-    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
-    appAttemptId = getApplicationAttemptId()
-    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
-    resourceManager = registerWithResourceManager()
-
-    // Workaround until hadoop moves to something which has
-    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
-    // ignore result.
-    // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
-    // Hence args.workerCores = numCore disabled above. Any better option?
-
-    // Compute number of threads for akka
-    //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-    //if (minimumMemory > 0) {
-    //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-    //  if (numCore > 0) {
-        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
-    //  }
-    //}
-    // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
-    ApplicationMaster.register(this)
-    // Start the user's JAR
-    userThread = startUserClass()
-
-    // This a bit hacky, but we need to wait until the spark.driver.port property has
-    // been set by the Thread executing the user class.
-    waitForSparkContextInitialized()
-
-    // Do this after spark master is up and SparkContext is created so that we can register UI Url
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Wait for the user class to Finish
-    userThread.join()
-
-    System.exit(0)
-  }
-
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-        .getOrElse(""))
-
-    if (localDirs.isEmpty()) {
-      throw new Exception("Yarn Local dirs can't be empty")
-    }
-    localDirs
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some
-    // sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    appMasterRequest.setTrackingUrl(uiAddress)
-    resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  private def startUserClass(): Thread  = {
-    logInfo("Starting the user JAR in a separate Thread")
-    val mainMethod = Class.forName(
-      args.userClass,
-      false /* initialize */,
-      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
-    val t = new Thread {
-      override def run() {
-        var successed = false
-        try {
-          // Copy
-          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
-          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
-          // userThread will stop here unless it has uncaught exception thrown out
-          // It need shutdown hook to set SUCCEEDED
-          successed = true
-        } finally {
-          logDebug("finishing main")
-          isLastAMRetry = true
-          if (successed) {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          } else {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
-          }
-        }
-      }
-    }
-    t.start()
-    t
-  }
-
-  // this need to happen before allocateWorkers
-  private def waitForSparkContextInitialized() {
-    logInfo("Waiting for spark context initialization")
-    try {
-      var sparkContext: SparkContext = null
-      ApplicationMaster.sparkContextRef.synchronized {
-        var count = 0
-        val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
-          logInfo("Waiting for spark context initialization ... " + count)
-          count = count + 1
-          ApplicationMaster.sparkContextRef.wait(waitTime)
-        }
-        sparkContext = ApplicationMaster.sparkContextRef.get()
-        assert(sparkContext != null || count >= numTries)
-
-        if (null != sparkContext) {
-          uiAddress = sparkContext.ui.appUIAddress
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            resourceManager,
-            appAttemptId,
-            args,
-            sparkContext.preferredNodeLocationData,
-            sparkContext.getConf)
-        } else {
-          logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
-            format(count * waitTime, numTries))
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            resourceManager,
-            appAttemptId,
-            args, 
-            sparkContext.getConf)
-        }
-      }
-    } finally {
-      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
-    }
-  }
-
-  private def allocateWorkers() {
-    try {
-      logInfo("Allocating " + args.numWorkers + " workers.")
-      // Wait until all containers have finished
-      // TODO: This is a bit ugly. Can we make it nicer?
-      // TODO: Handle container failure
-
-      // Exists the loop if the user thread exits.
-      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
-        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
-          finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of worker failures reached")
-        }
-        yarnAllocator.allocateContainers(
-          math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
-        ApplicationMaster.incrementAllocatorLoop(1)
-        Thread.sleep(100)
-      }
-    } finally {
-      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
-      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
-    }
-    logInfo("All workers have launched.")
-
-    // Launch a progress reporter thread, else the app will get killed after expiration
-    // (def: 10mins) timeout.
-    // TODO(harvey): Verify the timeout
-    if (userThread.isAlive) {
-      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-      // we want to be reasonably responsive without causing too many requests to RM.
-      val schedulerInterval =
-        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-      // must be <= timeoutInterval / 2.
-      val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-      launchReporterThread(interval)
-    }
-  }
-
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (userThread.isAlive) {
-          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
-            finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of worker failures reached")
-          }
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.allocateContainers(missingWorkerCount)
-          }
-          else sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // Setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // Simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  /*
-  def printContainers(containers: List[Container]) = {
-    for (container <- containers) {
-      logInfo("Launching shell command on a new container."
-        + ", containerId=" + container.getId()
-        + ", containerNode=" + container.getNodeId().getHost()
-        + ":" + container.getNodeId().getPort()
-        + ", containerNodeURI=" + container.getNodeHttpAddress()
-        + ", containerState" + container.getState()
-        + ", containerResourceMemory"
-        + container.getResource().getMemory())
-    }
-  }
-  */
-
-  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
-    synchronized {
-      if (isFinished) {
-        return
-      }
-      isFinished = true
-    }
-
-    logInfo("finishApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    finishReq.setDiagnostics(diagnostics)
-    // Set tracking url to empty since we don't have a history server.
-    finishReq.setTrackingUrl("")
-    resourceManager.finishApplicationMaster(finishReq)
-  }
-
-  /**
-   * Clean up the staging directory.
-   */
-  private def cleanupStagingDir() {
-    var stagingDirPath: Path = null
-    try {
-      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
-      if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
-        if (stagingDirPath == null) {
-          logError("Staging directory is null")
-          return
-        }
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
-      }
-    } catch {
-      case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
-  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
-    def run() {
-      logInfo("AppMaster received a signal.")
-      // we need to clean up staging dir before HDFS is shut down
-      // make sure we don't delete it until this is the last AM
-      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
-    }
-  }
-}
-
-object ApplicationMaster {
-  // Number of times to wait for the allocator loop to complete.
-  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
-  // This is to ensure that we have reasonable number of containers before we start
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
-  // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
-  def incrementAllocatorLoop(by: Int) {
-    val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
-      yarnAllocatorLoop.synchronized {
-        // to wake threads off wait ...
-        yarnAllocatorLoop.notifyAll()
-      }
-    }
-  }
-
-  private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
-  def register(master: ApplicationMaster) {
-    applicationMasters.add(master)
-  }
-
-  val sparkContextRef: AtomicReference[SparkContext] =
-    new AtomicReference[SparkContext](null /* initialValue */)
-  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
-
-  def sparkContextInitialized(sc: SparkContext): Boolean = {
-    var modified = false
-    sparkContextRef.synchronized {
-      modified = sparkContextRef.compareAndSet(null, sc)
-      sparkContextRef.notifyAll()
-    }
-
-    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
-    // System.exit.
-    // Should not really have to do this, but it helps YARN to evict resources earlier.
-    // Not to mention, prevent the Client from declaring failure even though we exited properly.
-    // Note that this will unfortunately not properly clean up the staging files because it gets
-    // called too late, after the filesystem is already shutdown.
-    if (modified) {
-      Runtime.getRuntime().addShutdownHook(new Thread with Logging {
-        // This is not only logs, but also ensures that log system is initialized for this instance
-        // when we are actually 'run'-ing.
-        logInfo("Adding shutdown hook for context " + sc)
-        override def run() {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-          // Best case ...
-          for (master <- applicationMasters) {
-            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          }
-        }
-      } )
-    }
-
-    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
-    yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
-        yarnAllocatorLoop.wait(1000L)
-      }
-    }
-    modified
-  }
-
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new ApplicationMaster(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 2bd047c..0000000
--- a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.{InetAddress, UnknownHostException, URI}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
-
-
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-
-  def this(args: ClientArguments) = this(new Configuration(), args)
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private val SPARK_STAGING: String = ".sparkStaging"
-  private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
-
-  // Staging directory is private! -> rwx--------
-  val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
-
-  // App files are world-wide readable and owner writable -> rw-r--r--
-  val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
-
-  // for client user who want to monitor app status by itself.
-  def runApp() = {
-    validateArgs()
-
-    init(yarnConf)
-    start()
-    logClusterResourceDetails()
-
-    val newApp = super.getNewApplication()
-    val appId = newApp.getApplicationId()
-
-    verifyClusterResources(newApp)
-    val appContext = createApplicationSubmissionContext(appId)
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val env = setupLaunchEnv(localResources, appStagingDir)
-    val amContainer = createContainerLaunchContext(newApp, localResources, env)
-
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(amContainer)
-    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    submitApp(appContext)
-    appId
-  }
-
-  def run() {
-    val appId = runApp()
-    monitorApplication(appId)
-    System.exit(0)
-  }
-
-  def validateArgs() = {
-    Map(
-      (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
-      (args.userJar == null) -> "Error: You must specify a user jar!",
-      (args.userClass == null) -> "Error: You must specify a user class!",
-      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
-      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
-        "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
-        "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
-    ).foreach { case(cond, errStr) =>
-      if (cond) {
-        logError(errStr)
-        args.printUsageAndExit(1)
-      }
-    }
-  }
-
-  def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
-  }
-
-  def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
-    logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
-      clusterMetrics.getNumNodeManagers)
-
-    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
-      queueApplicationCount = %s, queueChildQueueCount = %s""".format(
-        queueInfo.getQueueName,
-        queueInfo.getCurrentCapacity,
-        queueInfo.getMaximumCapacity,
-        queueInfo.getApplications.size,
-        queueInfo.getChildQueues.size))
-  }
-
-  def verifyClusterResources(app: GetNewApplicationResponse) = {
-    val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
-    // If we have requested more then the clusters max for a single resource then exit.
-    if (args.workerMemory > maxMem) {
-      logError("the worker size is to large to run on this cluster " + args.workerMemory)
-      System.exit(1)
-    }
-    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    if (amMem > maxMem) {
-      logError("AM size is to large to run on this cluster "  + amMem)
-      System.exit(1)
-    }
-
-    // We could add checks to make sure the entire cluster has enough resources but that involves
-    // getting all the node reports and computing ourselves
-  }
-
-  def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
-    logInfo("Setting up application submission context for ASM")
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    appContext.setApplicationId(appId)
-    appContext.setApplicationName(args.appName)
-    return appContext
-  }
-
-  /** See if two file systems are the same or not. */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
-    if (srcUri.getScheme() == null) {
-      return false
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false
-    }
-    var srcHost = srcUri.getHost()
-    var dstHost = dstUri.getHost()
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
-      } catch {
-        case e: UnknownHostException =>
-          return false
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false
-      }
-    } else if (srcHost == null && dstHost != null) {
-      return false
-    } else if (srcHost != null && dstHost == null) {
-      return false
-    }
-    //check for ports
-    if (srcUri.getPort() != dstUri.getPort()) {
-      return false
-    }
-    return true
-  }
-
-  /** Copy the file into HDFS if needed. */
-  private def copyRemoteFile(
-      dstDir: Path,
-      originalPath: Path,
-      replication: Short,
-      setPerms: Boolean = false): Path = {
-    val fs = FileSystem.get(conf)
-    val remoteFs = originalPath.getFileSystem(conf)
-    var newPath = originalPath
-    if (! compareFs(remoteFs, fs)) {
-      newPath = new Path(dstDir, originalPath.getName())
-      logInfo("Uploading " + originalPath + " to " + newPath)
-      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
-      fs.setReplication(newPath, replication)
-      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
-    }
-    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
-    // version shows the specific version in the distributed cache configuration
-    val qualPath = fs.makeQualified(newPath)
-    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
-    val destPath = fc.resolvePath(qualPath)
-    destPath
-  }
-
-  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
-    // local resources to the AM.
-    val fs = FileSystem.get(conf)
-
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    if (UserGroupInformation.isSecurityEnabled()) {
-      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        logError("Can't get Master Kerberos principal for use as renewer")
-        System.exit(1)
-      }
-    }
-    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val dstFs = dst.getFileSystem(conf)
-      dstFs.addDelegationTokens(delegTokenRenewer, credentials)
-    }
-    val localResources = HashMap[String, LocalResource]()
-    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
-
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-
-    Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
-      Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
-    .foreach { case(destName, _localPath) =>
-      val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (! localPath.isEmpty()) {
-        var localURI = new URI(localPath)
-        // if not specified assume these are in the local filesystem to keep behavior like Hadoop
-        if (localURI.getScheme() == null) {
-          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
-        }
-        val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
-        val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          destName, statCache)
-      }
-    }
-
-    // handle any add jars
-    if ((args.addJars != null) && (!args.addJars.isEmpty())){
-      args.addJars.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache, true)
-      }
-    }
-
-    // handle any distributed cache files
-    if ((args.files != null) && (!args.files.isEmpty())){
-      args.files.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache)
-      }
-    }
-
-    // handle any distributed cache archives
-    if ((args.archives != null) && (!args.archives.isEmpty())) {
-      args.archives.split(',').foreach { case file:String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
-          linkname, statCache)
-      }
-    }
-
-    UserGroupInformation.getCurrentUser().addCredentials(credentials)
-    return localResources
-  }
-
-  def setupLaunchEnv(
-      localResources: HashMap[String, LocalResource],
-      stagingDir: String): HashMap[String, String] = {
-    logInfo("Setting up the launch environment")
-    val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
-
-    val env = new HashMap[String, String]()
-
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
-    env("SPARK_YARN_MODE") = "true"
-    env("SPARK_YARN_STAGING_DIR") = stagingDir
-
-    // Set the environment variables to be passed on to the Workers.
-    distCacheMgr.setDistFilesEnv(env)
-    distCacheMgr.setDistArchivesEnv(env)
-
-    // Allow users to specify some environment variables.
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    // Add each SPARK-* key to the environment.
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    env
-  }
-
-  def userArgsToString(clientArgs: ClientArguments): String = {
-    val prefix = " --args "
-    val args = clientArgs.userArgs
-    val retval = new StringBuilder()
-    for (arg <- args){
-      retval.append(prefix).append(" '").append(arg).append("' ")
-    }
-    retval.toString
-  }
-
-  def createContainerLaunchContext(
-      newApp: GetNewApplicationResponse,
-      localResources: HashMap[String, LocalResource],
-      env: HashMap[String, String]): ContainerLaunchContext = {
-    logInfo("Setting up container launch context")
-    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
-    amContainer.setLocalResources(localResources)
-    amContainer.setEnvironment(env)
-
-    val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
-
-    // TODO(harvey): This can probably be a val.
-    var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
-      ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-        YarnAllocationHandler.MEMORY_OVERHEAD)
-
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-
-    // Add Xmx for am memory
-    JAVA_OPTS += "-Xmx" + amMemory + "m "
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out. The context is, default gc for server class machines
-    // end up using all cores to do gc - hence if there are multiple containers in same node,
-    // spark gc effects all other containers performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-    val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
-      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
-    if (useConcurrentAndIncrementalGC) {
-      // In our expts, using (default) throughput collector has severe perf ramnifications in
-      // multi-tenant machines
-      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-      JAVA_OPTS += " -XX:+CMSIncrementalMode "
-      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-    }
-
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    // Command for the ApplicationMaster
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      JAVA_OPTS +
-      " " + args.amClass +
-      " --class " + args.userClass +
-      " --jar " + args.userJar +
-      userArgsToString(args) +
-      " --worker-memory " + args.workerMemory +
-      " --worker-cores " + args.workerCores +
-      " --num-workers " + args.numWorkers +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-    logInfo("Command for the ApplicationMaster: " + commands(0))
-    amContainer.setCommands(commands)
-
-    val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-    amContainer.setResource(capability)
-
-    // Setup security tokens.
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    amContainer
-  }
-
-  def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
-  }
-
-  def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
-    while (true) {
-      Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t application identifier: " + appId.toString() + "\n" +
-        "\t appId: " + appId.getId() + "\n" +
-        "\t clientToken: " + report.getClientToken() + "\n" +
-        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
-        "\t appMasterHost: " + report.getHost() + "\n" +
-        "\t appQueue: " + report.getQueue() + "\n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
-        "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
-        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
-        "\t appUser: " + report.getUser()
-      )
-
-      val state = report.getYarnApplicationState()
-      val dsStatus = report.getFinalApplicationStatus()
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return true
-      }
-    }
-    true
-  }
-}
-
-object Client {
-  val SPARK_JAR: String = "spark.jar"
-  val APP_JAR: String = "app.jar"
-  val LOG4J_PROP: String = "log4j.properties"
-
-  def main(argStrings: Array[String]) {
-    // Set an env variable indicating we are running in YARN mode.
-    // Note that anything with SPARK prefix gets propagated to all (remote) processes
-    System.setProperty("SPARK_YARN_MODE", "true")
-
-    val args = new ClientArguments(argStrings)
-
-    new Client(args).run
-  }
-
-  // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
-  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
-    for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
-    }
-  }
-
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    // If log4j present, ensure ours overrides all others
-    if (addLog4j) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + LOG4J_PROP)
-    }
-    // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
-    if (userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + APP_JAR)
-    }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-      Path.SEPARATOR + SPARK_JAR)
-    Client.populateHadoopClasspath(conf, env)
-
-    if (!userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + APP_JAR)
-    }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-      Path.SEPARATOR + "*")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index e645307..0000000
--- a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
-  private val rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = _
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
-  private val sparkConf = new SparkConf
-
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-    }
-  }
-
-  def run() {
-
-    appAttemptId = getApplicationAttemptId()
-    resourceManager = registerWithResourceManager()
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
-    // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
-    if (minimumMemory > 0) {
-      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-      if (numCore > 0) {
-        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
-      }
-    }
-
-    waitForSparkMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
-    reporterThread = launchReporterThread(interval)
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    return appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
-    return resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Master now available: " + driverHost + ":" + driverPort)
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
-
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateWorkers() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
-      args, preferredNodeLocationData, sparkConf)
-
-    logInfo("Allocating " + args.numWorkers + " workers.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
-      yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
-      Thread.sleep(100)
-    }
-
-    logInfo("All workers have launched.")
-
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed) {
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
-            yarnAllocator.allocateContainers(missingWorkerCount)
-          }
-          else sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    return t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-
-    logInfo("finish ApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    resourceManager.finishApplicationMaster(finishReq)
-  }
-
-}
-
-
-object WorkerLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index 4f34bd9..0000000
--- a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.Logging
-
-
-class WorkerRunnable(
-    container: Container,
-    conf: Configuration,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    workerMemory: Int,
-    workerCores: Int) 
-  extends Runnable with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var cm: ContainerManager = _
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Worker Container")
-    cm = connectToCM
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    ctx.setContainerId(container.getId())
-    ctx.setResource(container.getResource())
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    val env = prepareEnvironment
-    ctx.setEnvironment(env)
-
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-    // Set the JVM memory
-    val workerMemoryString = workerMemory + "m"
-    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" + 
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence
-    // if there are multiple containers in same node, spark gc effects all other containers
-    // performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-/*
-    else {
-      // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
-      // want to mess with it.
-      // In our expts, using (default) throughput collector has severe perf ramnifications in
-      // multi-tennent machines
-      // The options are based on
-      // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-      JAVA_OPTS += " -XX:+CMSIncrementalMode "
-      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-    }
-*/
-
-    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
-      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
-      // an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
-      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      workerCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-    logInfo("Setting up worker with commands: " + commands)
-    ctx.setCommands(commands)
-
-    // Send the start request to the ContainerManager
-    val startReq = Records.newRecord(classOf[StartContainerRequest])
-    .asInstanceOf[StartContainerRequest]
-    startReq.setContainerLaunchContext(ctx)
-    cm.startContainer(startReq)
-  }
-
-  private def setupDistributedCache(
-      file: String,
-      rtype: LocalResourceType,
-      localResources: HashMap[String, LocalResource],
-      timestamp: String,
-      size: String, 
-      vis: String) = {
-    val uri = new URI(file)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
-    amJarRsrc.setTimestamp(timestamp.toLong)
-    amJarRsrc.setSize(size.toLong)
-    localResources(uri.getFragment()) = amJarRsrc
-  }
-
-  def prepareLocalResources: HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    val localResources = HashMap[String, LocalResource]()
-
-    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
-      for( i <- 0 to distFiles.length - 1) {
-        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i), visibilities(i))
-      }
-    }
-
-    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
-      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
-      for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
-          timeStamps(i), fileSizes(i), visibilities(i))
-      }
-    }
-
-    logInfo("Prepared Local resources " + localResources)
-    return localResources
-  }
-
-  def prepareEnvironment: HashMap[String, String] = {
-    val env = new HashMap[String, String]()
-
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
-    // Allow users to specify some environment variables
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    return env
-  }
-
-  def connectToCM: ContainerManager = {
-    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
-    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
-    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
-    // Use doAs and remoteUser here so we can add the container token and not pollute the current
-    // users credentials with all of the individual container tokens
-    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
-    val containerToken = container.getContainerToken()
-    if (containerToken != null) {
-      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
-    }
-
-    val proxy = user
-        .doAs(new PrivilegedExceptionAction[ContainerManager] {
-          def run: ContainerManager = {
-            return rpc.getProxy(classOf[ContainerManager],
-                cmAddress, conf).asInstanceOf[ContainerManager]
-          }
-        })
-    proxy
-  }
-
-}