You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 08:16:17 UTC

[05/32] Reorganize yarn related codes into sub projects to remove duplicate files.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 2bd047c..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.{InetAddress, UnknownHostException, URI}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
-
-
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-
-  def this(args: ClientArguments) = this(new Configuration(), args)
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private val SPARK_STAGING: String = ".sparkStaging"
-  private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
-
-  // Staging directory is private! -> rwx--------
-  val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
-
-  // App files are world-wide readable and owner writable -> rw-r--r--
-  val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
-
-  // for client user who want to monitor app status by itself.
-  def runApp() = {
-    validateArgs()
-
-    init(yarnConf)
-    start()
-    logClusterResourceDetails()
-
-    val newApp = super.getNewApplication()
-    val appId = newApp.getApplicationId()
-
-    verifyClusterResources(newApp)
-    val appContext = createApplicationSubmissionContext(appId)
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val env = setupLaunchEnv(localResources, appStagingDir)
-    val amContainer = createContainerLaunchContext(newApp, localResources, env)
-
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(amContainer)
-    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    submitApp(appContext)
-    appId
-  }
-
-  def run() {
-    val appId = runApp()
-    monitorApplication(appId)
-    System.exit(0)
-  }
-
-  def validateArgs() = {
-    Map(
-      (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
-      (args.userJar == null) -> "Error: You must specify a user jar!",
-      (args.userClass == null) -> "Error: You must specify a user class!",
-      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
-      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
-        "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
-        "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
-    ).foreach { case(cond, errStr) =>
-      if (cond) {
-        logError(errStr)
-        args.printUsageAndExit(1)
-      }
-    }
-  }
-
-  def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
-  }
-
-  def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
-    logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
-      clusterMetrics.getNumNodeManagers)
-
-    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
-      queueApplicationCount = %s, queueChildQueueCount = %s""".format(
-        queueInfo.getQueueName,
-        queueInfo.getCurrentCapacity,
-        queueInfo.getMaximumCapacity,
-        queueInfo.getApplications.size,
-        queueInfo.getChildQueues.size))
-  }
-
-  def verifyClusterResources(app: GetNewApplicationResponse) = {
-    val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
-    // If we have requested more then the clusters max for a single resource then exit.
-    if (args.workerMemory > maxMem) {
-      logError("the worker size is to large to run on this cluster " + args.workerMemory)
-      System.exit(1)
-    }
-    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    if (amMem > maxMem) {
-      logError("AM size is to large to run on this cluster "  + amMem)
-      System.exit(1)
-    }
-
-    // We could add checks to make sure the entire cluster has enough resources but that involves
-    // getting all the node reports and computing ourselves
-  }
-
-  def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
-    logInfo("Setting up application submission context for ASM")
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    appContext.setApplicationId(appId)
-    appContext.setApplicationName(args.appName)
-    return appContext
-  }
-
-  /** See if two file systems are the same or not. */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
-    if (srcUri.getScheme() == null) {
-      return false
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false
-    }
-    var srcHost = srcUri.getHost()
-    var dstHost = dstUri.getHost()
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
-      } catch {
-        case e: UnknownHostException =>
-          return false
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false
-      }
-    } else if (srcHost == null && dstHost != null) {
-      return false
-    } else if (srcHost != null && dstHost == null) {
-      return false
-    }
-    //check for ports
-    if (srcUri.getPort() != dstUri.getPort()) {
-      return false
-    }
-    return true
-  }
-
-  /** Copy the file into HDFS if needed. */
-  private def copyRemoteFile(
-      dstDir: Path,
-      originalPath: Path,
-      replication: Short,
-      setPerms: Boolean = false): Path = {
-    val fs = FileSystem.get(conf)
-    val remoteFs = originalPath.getFileSystem(conf)
-    var newPath = originalPath
-    if (! compareFs(remoteFs, fs)) {
-      newPath = new Path(dstDir, originalPath.getName())
-      logInfo("Uploading " + originalPath + " to " + newPath)
-      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
-      fs.setReplication(newPath, replication)
-      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
-    }
-    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
-    // version shows the specific version in the distributed cache configuration
-    val qualPath = fs.makeQualified(newPath)
-    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
-    val destPath = fc.resolvePath(qualPath)
-    destPath
-  }
-
-  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
-    // local resources to the AM.
-    val fs = FileSystem.get(conf)
-
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    if (UserGroupInformation.isSecurityEnabled()) {
-      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        logError("Can't get Master Kerberos principal for use as renewer")
-        System.exit(1)
-      }
-    }
-    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val dstFs = dst.getFileSystem(conf)
-      dstFs.addDelegationTokens(delegTokenRenewer, credentials)
-    }
-    val localResources = HashMap[String, LocalResource]()
-    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
-
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-
-    Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
-      Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
-    .foreach { case(destName, _localPath) =>
-      val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (! localPath.isEmpty()) {
-        var localURI = new URI(localPath)
-        // if not specified assume these are in the local filesystem to keep behavior like Hadoop
-        if (localURI.getScheme() == null) {
-          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
-        }
-        val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
-        val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          destName, statCache)
-      }
-    }
-
-    // handle any add jars
-    if ((args.addJars != null) && (!args.addJars.isEmpty())){
-      args.addJars.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache, true)
-      }
-    }
-
-    // handle any distributed cache files
-    if ((args.files != null) && (!args.files.isEmpty())){
-      args.files.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache)
-      }
-    }
-
-    // handle any distributed cache archives
-    if ((args.archives != null) && (!args.archives.isEmpty())) {
-      args.archives.split(',').foreach { case file:String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
-          linkname, statCache)
-      }
-    }
-
-    UserGroupInformation.getCurrentUser().addCredentials(credentials)
-    return localResources
-  }
-
-  def setupLaunchEnv(
-      localResources: HashMap[String, LocalResource],
-      stagingDir: String): HashMap[String, String] = {
-    logInfo("Setting up the launch environment")
-    val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
-
-    val env = new HashMap[String, String]()
-
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
-    env("SPARK_YARN_MODE") = "true"
-    env("SPARK_YARN_STAGING_DIR") = stagingDir
-
-    // Set the environment variables to be passed on to the Workers.
-    distCacheMgr.setDistFilesEnv(env)
-    distCacheMgr.setDistArchivesEnv(env)
-
-    // Allow users to specify some environment variables.
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    // Add each SPARK-* key to the environment.
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    env
-  }
-
-  def userArgsToString(clientArgs: ClientArguments): String = {
-    val prefix = " --args "
-    val args = clientArgs.userArgs
-    val retval = new StringBuilder()
-    for (arg <- args){
-      retval.append(prefix).append(" '").append(arg).append("' ")
-    }
-    retval.toString
-  }
-
-  def createContainerLaunchContext(
-      newApp: GetNewApplicationResponse,
-      localResources: HashMap[String, LocalResource],
-      env: HashMap[String, String]): ContainerLaunchContext = {
-    logInfo("Setting up container launch context")
-    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
-    amContainer.setLocalResources(localResources)
-    amContainer.setEnvironment(env)
-
-    val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
-
-    // TODO(harvey): This can probably be a val.
-    var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
-      ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-        YarnAllocationHandler.MEMORY_OVERHEAD)
-
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-
-    // Add Xmx for am memory
-    JAVA_OPTS += "-Xmx" + amMemory + "m "
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out. The context is, default gc for server class machines
-    // end up using all cores to do gc - hence if there are multiple containers in same node,
-    // spark gc effects all other containers performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-    val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
-      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
-    if (useConcurrentAndIncrementalGC) {
-      // In our expts, using (default) throughput collector has severe perf ramnifications in
-      // multi-tenant machines
-      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-      JAVA_OPTS += " -XX:+CMSIncrementalMode "
-      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-    }
-
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    // Command for the ApplicationMaster
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      JAVA_OPTS +
-      " " + args.amClass +
-      " --class " + args.userClass +
-      " --jar " + args.userJar +
-      userArgsToString(args) +
-      " --worker-memory " + args.workerMemory +
-      " --worker-cores " + args.workerCores +
-      " --num-workers " + args.numWorkers +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-    logInfo("Command for the ApplicationMaster: " + commands(0))
-    amContainer.setCommands(commands)
-
-    val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-    amContainer.setResource(capability)
-
-    // Setup security tokens.
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    amContainer
-  }
-
-  def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
-  }
-
-  def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
-    while (true) {
-      Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t application identifier: " + appId.toString() + "\n" +
-        "\t appId: " + appId.getId() + "\n" +
-        "\t clientToken: " + report.getClientToken() + "\n" +
-        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
-        "\t appMasterHost: " + report.getHost() + "\n" +
-        "\t appQueue: " + report.getQueue() + "\n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
-        "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
-        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
-        "\t appUser: " + report.getUser()
-      )
-
-      val state = report.getYarnApplicationState()
-      val dsStatus = report.getFinalApplicationStatus()
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return true
-      }
-    }
-    true
-  }
-}
-
-object Client {
-  val SPARK_JAR: String = "spark.jar"
-  val APP_JAR: String = "app.jar"
-  val LOG4J_PROP: String = "log4j.properties"
-
-  def main(argStrings: Array[String]) {
-    // Set an env variable indicating we are running in YARN mode.
-    // Note that anything with SPARK prefix gets propagated to all (remote) processes
-    System.setProperty("SPARK_YARN_MODE", "true")
-
-    val args = new ClientArguments(argStrings)
-
-    new Client(args).run
-  }
-
-  // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
-  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
-    for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
-    }
-  }
-
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    // If log4j present, ensure ours overrides all others
-    if (addLog4j) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + LOG4J_PROP)
-    }
-    // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
-    if (userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + APP_JAR)
-    }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-      Path.SEPARATOR + SPARK_JAR)
-    Client.populateHadoopClasspath(conf, env)
-
-    if (!userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-        Path.SEPARATOR + APP_JAR)
-    }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
-      Path.SEPARATOR + "*")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
deleted file mode 100644
index 9075ca7..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.SparkConf
-import org.apache.spark.util.MemoryParam
-import org.apache.spark.util.IntParam
-import collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
-
-// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
-  var addJars: String = null
-  var files: String = null
-  var archives: String = null
-  var userJar: String = null
-  var userClass: String = null
-  var userArgs: Seq[String] = Seq[String]()
-  var workerMemory = 1024
-  var workerCores = 1
-  var numWorkers = 2
-  var amQueue = new SparkConf().get("QUEUE", "default")
-  var amMemory: Int = 512
-  var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
-  var appName: String = "Spark"
-  // TODO
-  var inputFormatInfo: List[InputFormatInfo] = null
-
-  parseArgs(args.toList)
-
-  private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
-    val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
-
-    var args = inputArgs
-
-    while (! args.isEmpty) {
-
-      args match {
-        case ("--jar") :: value :: tail =>
-          userJar = value
-          args = tail
-
-        case ("--class") :: value :: tail =>
-          userClass = value
-          args = tail
-
-        case ("--args") :: value :: tail =>
-          userArgsBuffer += value
-          args = tail
-
-        case ("--master-class") :: value :: tail =>
-          amClass = value
-          args = tail
-
-        case ("--master-memory") :: MemoryParam(value) :: tail =>
-          amMemory = value
-          args = tail
-
-        case ("--worker-memory") :: MemoryParam(value) :: tail =>
-          workerMemory = value
-          args = tail
-
-        case ("--num-workers") :: IntParam(value) :: tail =>
-          numWorkers = value
-          args = tail
-
-        case ("--worker-cores") :: IntParam(value) :: tail =>
-          workerCores = value
-          args = tail
-
-        case ("--queue") :: value :: tail =>
-          amQueue = value
-          args = tail
-
-        case ("--name") :: value :: tail =>
-          appName = value
-          args = tail
-
-        case ("--addJars") :: value :: tail =>
-          addJars = value
-          args = tail
-
-        case ("--files") :: value :: tail =>
-          files = value
-          args = tail
-
-        case ("--archives") :: value :: tail =>
-          archives = value
-          args = tail
-
-        case Nil =>
-          if (userJar == null || userClass == null) {
-            printUsageAndExit(1)
-          }
-
-        case _ =>
-          printUsageAndExit(1, args)
-      }
-    }
-
-    userArgs = userArgsBuffer.readOnly
-    inputFormatInfo = inputFormatMap.values.toList
-  }
-
-
-  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
-    if (unknownParam != null) {
-      System.err.println("Unknown/unsupported param " + unknownParam)
-    }
-    System.err.println(
-      "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
-      "Options:\n" +
-      "  --jar JAR_PATH             Path to your application's JAR file (required)\n" +
-      "  --class CLASS_NAME         Name of your application's main class (required)\n" +
-      "  --args ARGS                Arguments to be passed to your application's main class.\n" +
-      "                             Mutliple invocations are possible, each will be passed in order.\n" +
-      "  --num-workers NUM          Number of workers to start (Default: 2)\n" +
-      "  --worker-cores NUM         Number of cores for the workers (Default: 1). This is unsused right now.\n" +
-      "  --master-class CLASS_NAME  Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
-      "  --master-memory MEM        Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
-      "  --worker-memory MEM        Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
-      "  --name NAME                The name of your application (Default: Spark)\n" +
-      "  --queue QUEUE              The hadoop queue to use for allocation requests (Default: 'default')\n" +
-      "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
-      "  --files files              Comma separated list of files to be distributed with the job.\n" +
-      "  --archives archives        Comma separated list of archives to be distributed with the job."
-      )
-    System.exit(exitCode)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
deleted file mode 100644
index 5f159b0..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import org.apache.spark.Logging 
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
-
-/** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
-  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
-  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
-
-
-  /**
-   * Add a resource to the list of distributed cache resources. This list can
-   * be sent to the ApplicationMaster and possibly the workers so that it can 
-   * be downloaded into the Hadoop distributed cache for use by this application.
-   * Adds the LocalResource to the localResources HashMap passed in and saves 
-   * the stats of the resources to they can be sent to the workers and verified.
-   *
-   * @param fs FileSystem
-   * @param conf Configuration
-   * @param destPath path to the resource
-   * @param localResources localResource hashMap to insert the resource into
-   * @param resourceType LocalResourceType 
-   * @param link link presented in the distributed cache to the destination
-   * @param statCache cache to store the file/directory stats 
-   * @param appMasterOnly Whether to only add the resource to the app master
-   */
-  def addResource(
-      fs: FileSystem,
-      conf: Configuration,
-      destPath: Path, 
-      localResources: HashMap[String, LocalResource],
-      resourceType: LocalResourceType,
-      link: String,
-      statCache: Map[URI, FileStatus],
-      appMasterOnly: Boolean = false) = {
-    val destStatus = fs.getFileStatus(destPath)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(resourceType)
-    val visibility = getVisibility(conf, destPath.toUri(), statCache)
-    amJarRsrc.setVisibility(visibility)
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
-    amJarRsrc.setTimestamp(destStatus.getModificationTime())
-    amJarRsrc.setSize(destStatus.getLen())
-    if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
-    localResources(link) = amJarRsrc
-    
-    if (appMasterOnly == false) {
-      val uri = destPath.toUri()
-      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
-      if (resourceType == LocalResourceType.FILE) {
-        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
-          destStatus.getModificationTime().toString(), visibility.name())
-      } else {
-        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
-          destStatus.getModificationTime().toString(), visibility.name())
-      }
-    }
-  }
-
-  /**
-   * Adds the necessary cache file env variables to the env passed in
-   * @param env
-   */
-  def setDistFilesEnv(env: Map[String, String]) = {
-    val (keys, tupleValues) = distCacheFiles.unzip
-    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
-    if (keys.size > 0) {
-      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
-        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
-        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
-        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
-    }
-  }
-
-  /**
-   * Adds the necessary cache archive env variables to the env passed in
-   * @param env
-   */
-  def setDistArchivesEnv(env: Map[String, String]) = {
-    val (keys, tupleValues) = distCacheArchives.unzip
-    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
-    if (keys.size > 0) {
-      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
-        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
-        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
-        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
-    }
-  }
-
-  /**
-   * Returns the local resource visibility depending on the cache file permissions
-   * @param conf
-   * @param uri
-   * @param statCache
-   * @return LocalResourceVisibility
-   */
-  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
-      LocalResourceVisibility = {
-    if (isPublic(conf, uri, statCache)) {
-      return LocalResourceVisibility.PUBLIC 
-    } 
-    return LocalResourceVisibility.PRIVATE
-  }
-
-  /**
-   * Returns a boolean to denote whether a cache file is visible to all(public)
-   * or not
-   * @param conf
-   * @param uri
-   * @param statCache
-   * @return true if the path in the uri is visible to all, false otherwise
-   */
-  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
-    val fs = FileSystem.get(uri, conf)
-    val current = new Path(uri.getPath())
-    //the leaf level file should be readable by others
-    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
-      return false
-    }
-    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
-  }
-
-  /**
-   * Returns true if all ancestors of the specified path have the 'execute'
-   * permission set for all users (i.e. that other users can traverse
-   * the directory heirarchy to the given path)
-   * @param fs
-   * @param path
-   * @param statCache
-   * @return true if all ancestors have the 'execute' permission set for all users
-   */
-  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
-      statCache: Map[URI, FileStatus]): Boolean =  {
-    var current = path
-    while (current != null) {
-      //the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
-        return false
-      }
-      current = current.getParent()
-    }
-    return true
-  }
-
-  /**
-   * Checks for a given path whether the Other permissions on it 
-   * imply the permission in the passed FsAction
-   * @param fs
-   * @param path
-   * @param action
-   * @param statCache
-   * @return true if the path in the uri is visible to all, false otherwise
-   */
-  def checkPermissionOfOther(fs: FileSystem, path: Path,
-      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
-    val status = getFileStatus(fs, path.toUri(), statCache)
-    val perms = status.getPermission()
-    val otherAction = perms.getOtherAction()
-    if (otherAction.implies(action)) {
-      return true
-    }
-    return false
-  }
-
-  /**
-   * Checks to see if the given uri exists in the cache, if it does it 
-   * returns the existing FileStatus, otherwise it stats the uri, stores
-   * it in the cache, and returns the FileStatus.
-   * @param fs
-   * @param uri
-   * @param statCache
-   * @return FileStatus
-   */
-  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
-    val stat = statCache.get(uri) match {
-      case Some(existstat) => existstat
-      case None => 
-        val newStat = fs.getFileStatus(new Path(uri))
-        statCache.put(uri, newStat)
-        newStat
-    }
-    return stat
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index a8de89c..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
-  private val rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = null
-  private var appAttemptId: ApplicationAttemptId = null
-  private var reporterThread: Thread = null
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = null
-  private var driverClosed:Boolean = false
-  private val sparkConf = new SparkConf
-
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf)._1
-  var actor: ActorRef = null
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = null
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      driver ! "hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-    }
-  }
-
-  def run() {
-
-    appAttemptId = getApplicationAttemptId()
-    resourceManager = registerWithResourceManager()
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
-    // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
-    if (minimumMemory > 0) {
-      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-      if (numCore > 0) {
-        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
-      }
-    }
-
-    waitForSparkMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
-    reporterThread = launchReporterThread(interval)
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    return appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
-    return resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Master now available: " + driverHost + ":" + driverPort)
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
-
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateWorkers() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
-      args, preferredNodeLocationData, sparkConf)
-
-    logInfo("Allocating " + args.numWorkers + " workers.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
-      yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
-      Thread.sleep(100)
-    }
-
-    logInfo("All workers have launched.")
-
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed) {
-          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
-            yarnAllocator.allocateContainers(missingWorkerCount)
-          }
-          else sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    return t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-
-    logInfo("finish ApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    resourceManager.finishApplicationMaster(finishReq)
-  }
-
-}
-
-
-object WorkerLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index 6a90cc5..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.Logging
-
-
-class WorkerRunnable(
-    container: Container,
-    conf: Configuration,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    workerMemory: Int,
-    workerCores: Int) 
-  extends Runnable with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var cm: ContainerManager = null
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Worker Container")
-    cm = connectToCM
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    ctx.setContainerId(container.getId())
-    ctx.setResource(container.getResource())
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    val env = prepareEnvironment
-    ctx.setEnvironment(env)
-
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-    // Set the JVM memory
-    val workerMemoryString = workerMemory + "m"
-    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" + 
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence
-    // if there are multiple containers in same node, spark gc effects all other containers
-    // performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-/*
-    else {
-      // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
-      // want to mess with it.
-      // In our expts, using (default) throughput collector has severe perf ramnifications in
-      // multi-tennent machines
-      // The options are based on
-      // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-      JAVA_OPTS += " -XX:+CMSIncrementalMode "
-      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-    }
-*/
-
-    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
-      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
-      // an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
-      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      workerCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-    logInfo("Setting up worker with commands: " + commands)
-    ctx.setCommands(commands)
-
-    // Send the start request to the ContainerManager
-    val startReq = Records.newRecord(classOf[StartContainerRequest])
-    .asInstanceOf[StartContainerRequest]
-    startReq.setContainerLaunchContext(ctx)
-    cm.startContainer(startReq)
-  }
-
-  private def setupDistributedCache(
-      file: String,
-      rtype: LocalResourceType,
-      localResources: HashMap[String, LocalResource],
-      timestamp: String,
-      size: String, 
-      vis: String) = {
-    val uri = new URI(file)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
-    amJarRsrc.setTimestamp(timestamp.toLong)
-    amJarRsrc.setSize(size.toLong)
-    localResources(uri.getFragment()) = amJarRsrc
-  }
-
-  def prepareLocalResources: HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    val localResources = HashMap[String, LocalResource]()
-
-    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
-      for( i <- 0 to distFiles.length - 1) {
-        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i), visibilities(i))
-      }
-    }
-
-    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
-      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
-      for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
-          timeStamps(i), fileSizes(i), visibilities(i))
-      }
-    }
-
-    logInfo("Prepared Local resources " + localResources)
-    return localResources
-  }
-
-  def prepareEnvironment: HashMap[String, String] = {
-    val env = new HashMap[String, String]()
-
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
-    // Allow users to specify some environment variables
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    return env
-  }
-
-  def connectToCM: ContainerManager = {
-    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
-    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
-    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
-    // Use doAs and remoteUser here so we can add the container token and not pollute the current
-    // users credentials with all of the individual container tokens
-    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
-    val containerToken = container.getContainerToken()
-    if (containerToken != null) {
-      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
-    }
-
-    val proxy = user
-        .doAs(new PrivilegedExceptionAction[ContainerManager] {
-          def run: ContainerManager = {
-            return rpc.getProxy(classOf[ContainerManager],
-                cmAddress, conf).asInstanceOf[ContainerManager]
-          }
-        })
-    proxy
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index c8af653..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,680 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(
-    val conf: Configuration,
-    val resourceManager: AMRMProtocol, 
-    val appAttemptId: ApplicationAttemptId,
-    val maxWorkers: Int,
-    val workerMemory: Int,
-    val workerCores: Int,
-    val preferredHostToCount: Map[String, Int], 
-    val preferredRackToCount: Map[String, Int],
-    val sparkConf: SparkConf)
-  extends Logging {
-  // These three are locked on allocatedHostToContainersMap. Complementary data structures
-  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
-
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
-
-  // Containers which have been released.
-  private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
-  // Containers to be released in next request to RM
-  private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  private val numWorkersRunning = new AtomicInteger()
-  // Used to generate a unique id per worker
-  private val workerIdCounter = new AtomicInteger()
-  private val lastResponseId = new AtomicInteger()
-  private val numWorkersFailed = new AtomicInteger()
-
-  def getNumWorkersRunning: Int = numWorkersRunning.intValue
-
-  def getNumWorkersFailed: Int = numWorkersFailed.intValue
-
-  def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-  }
-
-  def allocateContainers(workersToRequest: Int) {
-    // We need to send the request only once from what I understand ... but for now, not modifying
-    // this much.
-
-    // Keep polling the Resource Manager for containers
-    val amResp = allocateWorkerResources(workersToRequest).getAMResponse
-
-    val _allocatedContainers = amResp.getAllocatedContainers()
-
-    if (_allocatedContainers.size > 0) {
-      logDebug("""
-        Allocated containers: %d
-        Current worker count: %d
-        Containers released: %s
-        Containers to be released: %s
-        Cluster resources: %s
-        """.format(
-          _allocatedContainers.size,
-          numWorkersRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers,
-          amResp.getAvailableResources))
-
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      // Ignore if not satisfying constraints      {
-      for (container <- _allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // allocatedContainers += container
-
-          val host = container.getNodeId.getHost
-          val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
-
-          containers += container
-        }
-        // Add all ignored containers to released list
-        else releasedContainerList.add(container.getId())
-      }
-
-      // Find the appropriate containers to use. Slightly non trivial groupBy ...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet)
-      {
-        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
-        var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
-        assert(remainingContainers != null)
-
-        if (requiredHostCount >= remainingContainers.size){
-          // Since we got <= required containers, add all to dataLocalContainers
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // all consumed
-          remainingContainers = null
-        }
-        else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split into two : data local container count of (remainingContainers.size -
-          // requiredHostCount) and rest as remainingContainer
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-          // remainingContainers = remaining
-
-          // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
-          // add remaining to release list. If we have insufficient containers, next allocation 
-          // cycle will reallocate (but wont treat it as data local)
-          for (container <- remaining) releasedContainerList.add(container.getId())
-          remainingContainers = null
-        }
-
-        // Now rack local
-        if (remainingContainers != null){
-          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
-
-          if (rack != null){
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - 
-              rackLocalContainers.get(rack).getOrElse(List()).size
-
-
-            if (requiredRackCount >= remainingContainers.size){
-              // Add all to dataLocalContainers
-              dataLocalContainers.put(rack, remainingContainers)
-              // All consumed
-              remainingContainers = null
-            }
-            else if (requiredRackCount > 0) {
-              // container list has more containers than we need for data locality.
-              // Split into two : data local container count of (remainingContainers.size -
-              // requiredRackCount) and rest as remainingContainer
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        // If still not consumed, then it is off rack host - add to that list.
-        if (remainingContainers != null){
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through them in order : 
-      // first host local, then rack local and then off rack (everything else).
-      // Note that the list we create below tries to ensure that not all containers end up within a
-      // host if there are sufficiently large number of hosts/containers.
-
-      val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers
-      for (container <- allocatedContainers) {
-        val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
-        val workerHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        assert(
-          container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
-
-        if (numWorkersRunningNow > maxWorkers) {
-          logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, workerHostname))
-          releasedContainerList.add(containerId)
-          // reset counter back to old value.
-          numWorkersRunning.decrementAndGet()
-        }
-        else {
-          // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
-          // (workerIdCounter)
-          val workerId = workerIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("launching container on " + containerId + " host " + workerHostname)
-          // Just to be safe, simply remove it from pendingReleaseContainers.
-          // Should not be there, but ..
-          pendingReleaseContainers.remove(containerId)
-
-          val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, workerHostname)
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
-            }
-          }
-
-          new Thread(
-            new WorkerRunnable(container, conf, driverUrl, workerId,
-              workerHostname, workerMemory, workerCores)
-          ).start()
-        }
-      }
-      logDebug("""
-        Finished processing %d containers.
-        Current number of workers running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          allocatedContainers.size,
-          numWorkersRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
-
-
-    val completedContainers = amResp.getCompletedContainersStatuses()
-    if (completedContainers.size > 0){
-      logDebug("Completed %d containers, to-be-released: %s".format(
-        completedContainers.size, releasedContainerList))
-      for (completedContainer <- completedContainers){
-        val containerId = completedContainer.getContainerId
-
-        // Was this released by us ? If yes, then simply remove from containerSet and move on.
-        if (pendingReleaseContainers.containsKey(containerId)) {
-          pendingReleaseContainers.remove(containerId)
-        }
-        else {
-          // Simply decrement count - next iteration of ReporterThread will take care of allocating.
-          numWorkersRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus()))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus() != 0) {
-            logInfo("Container marked as failed: " + containerId)
-            numWorkersFailed.incrementAndGet()
-          }
-        }
-
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
-            assert (host != null)
-
-            val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
-            assert (containerSet != null)
-
-            containerSet -= containerId
-            if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
-            else allocatedHostToContainersMap.update(host, containerSet)
-
-            allocatedContainerToHostMap -= containerId
-
-            // Doing this within locked context, sigh ... move to outside ?
-            val rack = YarnAllocationHandler.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
-              else allocatedRackCount.remove(rack)
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of workers running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          completedContainers.size,
-          numWorkersRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
-  }
-
-  def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
-    // First generate modified racks and new set of hosts under it : then issue requests
-    val rackToCounts = new HashMap[String, Int]()
-
-    // Within this lock - used to read/write to the rack related maps too.
-    for (container <- hostContainers) {
-      val candidateHost = container.getHostName
-      val candidateNumContainers = container.getNumContainers
-      assert(YarnAllocationHandler.ANY_HOST != candidateHost)
-
-      val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += candidateNumContainers
-        rackToCounts.put(rack, count)
-      }
-    }
-
-    val requestedContainers: ArrayBuffer[ResourceRequest] = 
-      new ArrayBuffer[ResourceRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts){
-      requestedContainers += 
-        createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
-    }
-
-    requestedContainers.toList
-  }
-
-  def allocatedContainersOnHost(host: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
-    }
-    retval
-  }
-
-  def allocatedContainersOnRack(rack: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedRackCount.getOrElse(rack, 0)
-    }
-    retval
-  }
-
-  private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
-
-    var resourceRequests: List[ResourceRequest] = null
-
-      // default.
-    if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
-      logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
-      resourceRequests = List(
-        createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
-    }
-    else {
-      // request for all hosts in preferred nodes and for numWorkers - 
-      // candidates.size, request by default allocation policy.
-      val hostContainerRequests: ArrayBuffer[ResourceRequest] = 
-        new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
-      for ((candidateHost, candidateCount) <- preferredHostToCount) {
-        val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
-        if (requiredCount > 0) {
-          hostContainerRequests += createResourceRequest(
-            AllocationType.HOST,
-            candidateHost,
-            requiredCount,
-            YarnAllocationHandler.PRIORITY)
-        }
-      }
-      val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
-        hostContainerRequests.toList)
-
-      val anyContainerRequests: ResourceRequest = createResourceRequest(
-        AllocationType.ANY,
-        resource = null,
-        numWorkers,
-        YarnAllocationHandler.PRIORITY)
-
-      val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
-        hostContainerRequests.size + rackContainerRequests.size + 1)
-
-      containerRequests ++= hostContainerRequests
-      containerRequests ++= rackContainerRequests
-      containerRequests += anyContainerRequests
-
-      resourceRequests = containerRequests.toList
-    }
-
-    val req = Records.newRecord(classOf[AllocateRequest])
-    req.setResponseId(lastResponseId.incrementAndGet)
-    req.setApplicationAttemptId(appAttemptId)
-
-    req.addAllAsks(resourceRequests)
-
-    val releasedContainerList = createReleasedContainerList()
-    req.addAllReleases(releasedContainerList)
-
-    if (numWorkers > 0) {
-      logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
-        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
-    }
-    else {
-      logDebug("Empty allocation req ..  release : " + releasedContainerList)
-    }
-
-    for (request <- resourceRequests) {
-      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
-        format(
-          request.getHostName,
-          request.getNumContainers,
-          request.getPriority,
-          request.getCapability))
-    }
-    resourceManager.allocate(req)
-  }
-
-
-  private def createResourceRequest(
-    requestType: AllocationType.AllocationType, 
-    resource:String,
-    numWorkers: Int,
-    priority: Int): ResourceRequest = {
-
-    // If hostname specified, we need atleast two requests - node local and rack local.
-    // There must be a third request - which is ANY : that will be specially handled.
-    requestType match {
-      case AllocationType.HOST => {
-        assert(YarnAllocationHandler.ANY_HOST != resource)
-        val hostname = resource
-        val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
-
-        // Add to host->rack mapping
-        YarnAllocationHandler.populateRackInfo(conf, hostname)
-
-        nodeLocal
-      }
-      case AllocationType.RACK => {
-        val rack = resource
-        createResourceRequestImpl(rack, numWorkers, priority)
-      }
-      case AllocationType.ANY => createResourceRequestImpl(
-        YarnAllocationHandler.ANY_HOST, numWorkers, priority)
-      case _ => throw new IllegalArgumentException(
-        "Unexpected/unsupported request type: " + requestType)
-    }
-  }
-
-  private def createResourceRequestImpl(
-    hostname:String,
-    numWorkers: Int,
-    priority: Int): ResourceRequest = {
-
-    val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
-    val memCapability = Records.newRecord(classOf[Resource])
-    // There probably is some overhead here, let's reserve a bit more memory.
-    memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-    rsrcRequest.setCapability(memCapability)
-
-    val pri = Records.newRecord(classOf[Priority])
-    pri.setPriority(priority)
-    rsrcRequest.setPriority(pri)
-
-    rsrcRequest.setHostName(hostname)
-
-    rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
-    rsrcRequest
-  }
-
-  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-
-    val retval = new ArrayBuffer[ContainerId](1)
-    // Iterator on COW list ...
-    for (container <- releasedContainerList.iterator()){
-      retval += container
-    }
-    // Remove from the original list.
-    if (! retval.isEmpty) {
-      releasedContainerList.removeAll(retval)
-      for (v <- retval) pendingReleaseContainers.put(v, true)
-      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + 
-        pendingReleaseContainers)
-    }
-
-    retval
-  }
-}
-
-object YarnAllocationHandler {
-
-  val ANY_HOST = "*"
-  // All requests are issued with same priority : we do not (yet) have any distinction between 
-  // request types (like map/reduce in hadoop for example)
-  val PRIORITY = 1
-
-  // Additional memory overhead - in mb
-  val MEMORY_OVERHEAD = 384
-
-  // Host to rack map - saved from allocation requests
-  // We are expecting this not to change.
-  // Note that it is possible for this to change : and RM will indicate that to us via update 
-  // response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
-      Map[String, Int](),
-      Map[String, Int](),
-      sparkConf)
-  }
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    map: collection.Map[String,
-    collection.Set[SplitInfo]],
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
-      hostToCount,
-      rackToCount,
-      sparkConf)
-  }
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    maxWorkers: Int,
-    workerMemory: Int,
-    workerCores: Int,
-    map: collection.Map[String, collection.Set[SplitInfo]],
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      maxWorkers,
-      workerMemory,
-      workerCores,
-      hostToCount,
-      rackToCount,
-      sparkConf)
-  }
-
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-    conf: Configuration,
-    input: collection.Map[String, collection.Set[SplitInfo]]) :
-  // host to count, rack to count
-  (Map[String, Int], Map[String, Int]) = {
-
-    if (input == null) return (Map[String, Int](), Map[String, Int]())
-
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
-
-      val rack = lookupRack(conf, host)
-      if (rack != null){
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
-      }
-    }
-
-    (hostToCount.toMap, rackToCount.toMap)
-  }
-
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) populateRackInfo(conf, host)
-    hostToRack.get(host)
-  }
-
-  def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
-    val set = rackToHostSet.get(rack)
-    if (set == null) return None
-
-    // No better way to get a Set[String] from JSet ?
-    val convertedSet: collection.mutable.Set[String] = set
-    Some(convertedSet.toSet)
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list ?
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out this comment...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index 2ba2366..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
-  // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
-  override def isYarnMode(): Boolean = { true }
-
-  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
-  // Always create a new config, dont reuse yarnConf.
-  override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
-
-  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
-  override def addCredentials(conf: JobConf) {
-    val jobCreds = conf.getCredentials()
-    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
-  }
-}