You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 08:16:38 UTC

[26/32] Using name yarn-alpha/yarn instead of yarn-2.0/yarn-2.2

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
new file mode 100644
index 0000000..e645307
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote._
+import akka.actor.Terminated
+import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+  private val rpc: YarnRPC = YarnRPC.create(conf)
+  private var resourceManager: AMRMProtocol = _
+  private var appAttemptId: ApplicationAttemptId = _
+  private var reporterThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var driverClosed:Boolean = false
+  private val sparkConf = new SparkConf
+
+  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+    conf = sparkConf)._1
+  var actor: ActorRef = _
+
+  // This actor just working as a monitor to watch on Driver Actor.
+  class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorSelection = _
+
+    override def preStart() {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+      driver ! "Hello"
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        driverClosed = true
+    }
+  }
+
+  def run() {
+
+    appAttemptId = getApplicationAttemptId()
+    resourceManager = registerWithResourceManager()
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+    // Compute number of threads for akka
+    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+    if (minimumMemory > 0) {
+      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+      if (numCore > 0) {
+        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+        // TODO: Uncomment when hadoop is on a version which has this fixed.
+        // args.workerCores = numCore
+      }
+    }
+
+    waitForSparkMaster()
+
+    // Allocate all containers
+    allocateWorkers()
+
+    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    // must be <= timeoutInterval/ 2.
+    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    reporterThread = launchReporterThread(interval)
+
+    // Wait for the reporter thread to Finish.
+    reporterThread.join()
+
+    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+    actorSystem.shutdown()
+
+    logInfo("Exited")
+    System.exit(0)
+  }
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    return appAttemptId
+  }
+
+  private def registerWithResourceManager(): AMRMProtocol = {
+    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+    logInfo("Connecting to ResourceManager at " + rmAddress)
+    return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+      .asInstanceOf[RegisterApplicationMasterRequest]
+    appMasterRequest.setApplicationAttemptId(appAttemptId)
+    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+    // Users can then monitor stderr/stdout on that node if required.
+    appMasterRequest.setHost(Utils.localHostName())
+    appMasterRequest.setRpcPort(0)
+    // What do we provide here ? Might make sense to expose something sensible later ?
+    appMasterRequest.setTrackingUrl("")
+    return resourceManager.registerApplicationMaster(appMasterRequest)
+  }
+
+  private def waitForSparkMaster() {
+    logInfo("Waiting for spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while(!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Master now available: " + driverHost + ":" + driverPort)
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+        Thread.sleep(100)
+      }
+    }
+    sparkConf.set("spark.driver.host",  driverHost)
+    sparkConf.set("spark.driver.port",  driverPort.toString)
+
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+  }
+
+
+  private def allocateWorkers() {
+
+    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
+
+    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
+      args, preferredNodeLocationData, sparkConf)
+
+    logInfo("Allocating " + args.numWorkers + " workers.")
+    // Wait until all containers have finished
+    // TODO: This is a bit ugly. Can we make it nicer?
+    // TODO: Handle container failure
+    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+      yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+      Thread.sleep(100)
+    }
+
+    logInfo("All workers have launched.")
+
+  }
+
+  // TODO: We might want to extend this to allocate more containers in case they die !
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (!driverClosed) {
+          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+          if (missingWorkerCount > 0) {
+            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+            yarnAllocator.allocateContainers(missingWorkerCount)
+          }
+          else sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    return t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateContainers(0)
+  }
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+
+    logInfo("finish ApplicationMaster with " + status)
+    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+      .asInstanceOf[FinishApplicationMasterRequest]
+    finishReq.setAppAttemptId(appAttemptId)
+    finishReq.setFinishApplicationStatus(status)
+    resourceManager.finishApplicationMaster(finishReq)
+  }
+
+}
+
+
+object WorkerLauncher {
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new WorkerLauncher(args).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
new file mode 100644
index 0000000..4f34bd9
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
+
+import org.apache.spark.Logging
+
+
+class WorkerRunnable(
+    container: Container,
+    conf: Configuration,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    workerMemory: Int,
+    workerCores: Int) 
+  extends Runnable with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var cm: ContainerManager = _
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Worker Container")
+    cm = connectToCM
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    ctx.setContainerId(container.getId())
+    ctx.setResource(container.getResource())
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    val env = prepareEnvironment
+    ctx.setEnvironment(env)
+
+    // Extra options for the JVM
+    var JAVA_OPTS = ""
+    // Set the JVM memory
+    val workerMemoryString = workerMemory + "m"
+    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+    }
+
+    JAVA_OPTS += " -Djava.io.tmpdir=" + 
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+/*
+    else {
+      // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+      // want to mess with it.
+      // In our expts, using (default) throughput collector has severe perf ramnifications in
+      // multi-tennent machines
+      // The options are based on
+      // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+      JAVA_OPTS += " -XX:+CMSIncrementalMode "
+      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+    }
+*/
+
+    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+    var javaCommand = "java"
+    val javaHome = System.getenv("JAVA_HOME")
+    if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand +
+      " -server " +
+      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+      // an inconsistent state.
+      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+      " -XX:OnOutOfMemoryError='kill %p' " +
+      JAVA_OPTS +
+      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+      masterAddress + " " +
+      slaveId + " " +
+      hostname + " " +
+      workerCores +
+      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+    logInfo("Setting up worker with commands: " + commands)
+    ctx.setCommands(commands)
+
+    // Send the start request to the ContainerManager
+    val startReq = Records.newRecord(classOf[StartContainerRequest])
+    .asInstanceOf[StartContainerRequest]
+    startReq.setContainerLaunchContext(ctx)
+    cm.startContainer(startReq)
+  }
+
+  private def setupDistributedCache(
+      file: String,
+      rtype: LocalResourceType,
+      localResources: HashMap[String, LocalResource],
+      timestamp: String,
+      size: String, 
+      vis: String) = {
+    val uri = new URI(file)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(rtype)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+    amJarRsrc.setTimestamp(timestamp.toLong)
+    amJarRsrc.setSize(size.toLong)
+    localResources(uri.getFragment()) = amJarRsrc
+  }
+
+  def prepareLocalResources: HashMap[String, LocalResource] = {
+    logInfo("Preparing Local resources")
+    val localResources = HashMap[String, LocalResource]()
+
+    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+      for( i <- 0 to distFiles.length - 1) {
+        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+          fileSizes(i), visibilities(i))
+      }
+    }
+
+    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+      for( i <- 0 to distArchives.length - 1) {
+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
+          timeStamps(i), fileSizes(i), visibilities(i))
+      }
+    }
+
+    logInfo("Prepared Local resources " + localResources)
+    return localResources
+  }
+
+  def prepareEnvironment: HashMap[String, String] = {
+    val env = new HashMap[String, String]()
+
+    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+
+    // Allow users to specify some environment variables
+    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+    return env
+  }
+
+  def connectToCM: ContainerManager = {
+    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+
+    // Use doAs and remoteUser here so we can add the container token and not pollute the current
+    // users credentials with all of the individual container tokens
+    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+    val containerToken = container.getContainerToken()
+    if (containerToken != null) {
+      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+    }
+
+    val proxy = user
+        .doAs(new PrivilegedExceptionAction[ContainerManager] {
+          def run: ContainerManager = {
+            return rpc.getProxy(classOf[ContainerManager],
+                cmAddress, conf).asInstanceOf[ContainerManager]
+          }
+        })
+    proxy
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
new file mode 100644
index 0000000..c8af653
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
+import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.AMRMProtocol
+import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
+import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.util.{RackResolver, Records}
+
+
+object AllocationType extends Enumeration {
+  type AllocationType = Value
+  val HOST, RACK, ANY = Value
+}
+
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+private[yarn] class YarnAllocationHandler(
+    val conf: Configuration,
+    val resourceManager: AMRMProtocol, 
+    val appAttemptId: ApplicationAttemptId,
+    val maxWorkers: Int,
+    val workerMemory: Int,
+    val workerCores: Int,
+    val preferredHostToCount: Map[String, Int], 
+    val preferredRackToCount: Map[String, Int],
+    val sparkConf: SparkConf)
+  extends Logging {
+  // These three are locked on allocatedHostToContainersMap. Complementary data structures
+  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+  // allocatedContainerToHostMap: container to host mapping.
+  private val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]()
+
+  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+
+  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+  // allocated node)
+  // As with the two data structures above, tightly coupled with them, and to be locked on
+  // allocatedHostToContainersMap
+  private val allocatedRackCount = new HashMap[String, Int]()
+
+  // Containers which have been released.
+  private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
+  // Containers to be released in next request to RM
+  private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+  private val numWorkersRunning = new AtomicInteger()
+  // Used to generate a unique id per worker
+  private val workerIdCounter = new AtomicInteger()
+  private val lastResponseId = new AtomicInteger()
+  private val numWorkersFailed = new AtomicInteger()
+
+  def getNumWorkersRunning: Int = numWorkersRunning.intValue
+
+  def getNumWorkersFailed: Int = numWorkersFailed.intValue
+
+  def isResourceConstraintSatisfied(container: Container): Boolean = {
+    container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+  }
+
+  def allocateContainers(workersToRequest: Int) {
+    // We need to send the request only once from what I understand ... but for now, not modifying
+    // this much.
+
+    // Keep polling the Resource Manager for containers
+    val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+
+    val _allocatedContainers = amResp.getAllocatedContainers()
+
+    if (_allocatedContainers.size > 0) {
+      logDebug("""
+        Allocated containers: %d
+        Current worker count: %d
+        Containers released: %s
+        Containers to be released: %s
+        Cluster resources: %s
+        """.format(
+          _allocatedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers,
+          amResp.getAvailableResources))
+
+      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      // Ignore if not satisfying constraints      {
+      for (container <- _allocatedContainers) {
+        if (isResourceConstraintSatisfied(container)) {
+          // allocatedContainers += container
+
+          val host = container.getNodeId.getHost
+          val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
+
+          containers += container
+        }
+        // Add all ignored containers to released list
+        else releasedContainerList.add(container.getId())
+      }
+
+      // Find the appropriate containers to use. Slightly non trivial groupBy ...
+      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      for (candidateHost <- hostToContainers.keySet)
+      {
+        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+        var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
+        assert(remainingContainers != null)
+
+        if (requiredHostCount >= remainingContainers.size){
+          // Since we got <= required containers, add all to dataLocalContainers
+          dataLocalContainers.put(candidateHost, remainingContainers)
+          // all consumed
+          remainingContainers = null
+        }
+        else if (requiredHostCount > 0) {
+          // Container list has more containers than we need for data locality.
+          // Split into two : data local container count of (remainingContainers.size -
+          // requiredHostCount) and rest as remainingContainer
+          val (dataLocal, remaining) = remainingContainers.splitAt(
+            remainingContainers.size - requiredHostCount)
+          dataLocalContainers.put(candidateHost, dataLocal)
+          // remainingContainers = remaining
+
+          // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
+          // add remaining to release list. If we have insufficient containers, next allocation 
+          // cycle will reallocate (but wont treat it as data local)
+          for (container <- remaining) releasedContainerList.add(container.getId())
+          remainingContainers = null
+        }
+
+        // Now rack local
+        if (remainingContainers != null){
+          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+
+          if (rack != null){
+            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - 
+              rackLocalContainers.get(rack).getOrElse(List()).size
+
+
+            if (requiredRackCount >= remainingContainers.size){
+              // Add all to dataLocalContainers
+              dataLocalContainers.put(rack, remainingContainers)
+              // All consumed
+              remainingContainers = null
+            }
+            else if (requiredRackCount > 0) {
+              // container list has more containers than we need for data locality.
+              // Split into two : data local container count of (remainingContainers.size -
+              // requiredRackCount) and rest as remainingContainer
+              val (rackLocal, remaining) = remainingContainers.splitAt(
+                remainingContainers.size - requiredRackCount)
+              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                new ArrayBuffer[Container]())
+
+              existingRackLocal ++= rackLocal
+              remainingContainers = remaining
+            }
+          }
+        }
+
+        // If still not consumed, then it is off rack host - add to that list.
+        if (remainingContainers != null){
+          offRackContainers.put(candidateHost, remainingContainers)
+        }
+      }
+
+      // Now that we have split the containers into various groups, go through them in order : 
+      // first host local, then rack local and then off rack (everything else).
+      // Note that the list we create below tries to ensure that not all containers end up within a
+      // host if there are sufficiently large number of hosts/containers.
+
+      val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
+      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+
+      // Run each of the allocated containers
+      for (container <- allocatedContainers) {
+        val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
+        val workerHostname = container.getNodeId.getHost
+        val containerId = container.getId
+
+        assert(
+          container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+
+        if (numWorkersRunningNow > maxWorkers) {
+          logInfo("""Ignoring container %s at host %s, since we already have the required number of
+            containers for it.""".format(containerId, workerHostname))
+          releasedContainerList.add(containerId)
+          // reset counter back to old value.
+          numWorkersRunning.decrementAndGet()
+        }
+        else {
+          // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+          // (workerIdCounter)
+          val workerId = workerIdCounter.incrementAndGet().toString
+          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+            sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
+            CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+          logInfo("launching container on " + containerId + " host " + workerHostname)
+          // Just to be safe, simply remove it from pendingReleaseContainers.
+          // Should not be there, but ..
+          pendingReleaseContainers.remove(containerId)
+
+          val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+          allocatedHostToContainersMap.synchronized {
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+              new HashSet[ContainerId]())
+
+            containerSet += containerId
+            allocatedContainerToHostMap.put(containerId, workerHostname)
+            if (rack != null) {
+              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+            }
+          }
+
+          new Thread(
+            new WorkerRunnable(container, conf, driverUrl, workerId,
+              workerHostname, workerMemory, workerCores)
+          ).start()
+        }
+      }
+      logDebug("""
+        Finished processing %d containers.
+        Current number of workers running: %d,
+        releasedContainerList: %s,
+        pendingReleaseContainers: %s
+        """.format(
+          allocatedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers))
+    }
+
+
+    val completedContainers = amResp.getCompletedContainersStatuses()
+    if (completedContainers.size > 0){
+      logDebug("Completed %d containers, to-be-released: %s".format(
+        completedContainers.size, releasedContainerList))
+      for (completedContainer <- completedContainers){
+        val containerId = completedContainer.getContainerId
+
+        // Was this released by us ? If yes, then simply remove from containerSet and move on.
+        if (pendingReleaseContainers.containsKey(containerId)) {
+          pendingReleaseContainers.remove(containerId)
+        }
+        else {
+          // Simply decrement count - next iteration of ReporterThread will take care of allocating.
+          numWorkersRunning.decrementAndGet()
+          logInfo("Completed container %s (state: %s, exit status: %s)".format(
+            containerId,
+            completedContainer.getState,
+            completedContainer.getExitStatus()))
+          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+          // there are some exit status' we shouldn't necessarily count against us, but for
+          // now I think its ok as none of the containers are expected to exit
+          if (completedContainer.getExitStatus() != 0) {
+            logInfo("Container marked as failed: " + containerId)
+            numWorkersFailed.incrementAndGet()
+          }
+        }
+
+        allocatedHostToContainersMap.synchronized {
+          if (allocatedContainerToHostMap.containsKey(containerId)) {
+            val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
+            assert (host != null)
+
+            val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
+            assert (containerSet != null)
+
+            containerSet -= containerId
+            if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
+            else allocatedHostToContainersMap.update(host, containerSet)
+
+            allocatedContainerToHostMap -= containerId
+
+            // Doing this within locked context, sigh ... move to outside ?
+            val rack = YarnAllocationHandler.lookupRack(conf, host)
+            if (rack != null) {
+              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+              if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
+              else allocatedRackCount.remove(rack)
+            }
+          }
+        }
+      }
+      logDebug("""
+        Finished processing %d completed containers.
+        Current number of workers running: %d,
+        releasedContainerList: %s,
+        pendingReleaseContainers: %s
+        """.format(
+          completedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers))
+    }
+  }
+
+  def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
+    // First generate modified racks and new set of hosts under it : then issue requests
+    val rackToCounts = new HashMap[String, Int]()
+
+    // Within this lock - used to read/write to the rack related maps too.
+    for (container <- hostContainers) {
+      val candidateHost = container.getHostName
+      val candidateNumContainers = container.getNumContainers
+      assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+
+      val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+      if (rack != null) {
+        var count = rackToCounts.getOrElse(rack, 0)
+        count += candidateNumContainers
+        rackToCounts.put(rack, count)
+      }
+    }
+
+    val requestedContainers: ArrayBuffer[ResourceRequest] = 
+      new ArrayBuffer[ResourceRequest](rackToCounts.size)
+    for ((rack, count) <- rackToCounts){
+      requestedContainers += 
+        createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
+    }
+
+    requestedContainers.toList
+  }
+
+  def allocatedContainersOnHost(host: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+    }
+    retval
+  }
+
+  def allocatedContainersOnRack(rack: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedRackCount.getOrElse(rack, 0)
+    }
+    retval
+  }
+
+  private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+
+    var resourceRequests: List[ResourceRequest] = null
+
+      // default.
+    if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
+      logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+      resourceRequests = List(
+        createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+    }
+    else {
+      // request for all hosts in preferred nodes and for numWorkers - 
+      // candidates.size, request by default allocation policy.
+      val hostContainerRequests: ArrayBuffer[ResourceRequest] = 
+        new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
+      for ((candidateHost, candidateCount) <- preferredHostToCount) {
+        val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
+
+        if (requiredCount > 0) {
+          hostContainerRequests += createResourceRequest(
+            AllocationType.HOST,
+            candidateHost,
+            requiredCount,
+            YarnAllocationHandler.PRIORITY)
+        }
+      }
+      val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
+        hostContainerRequests.toList)
+
+      val anyContainerRequests: ResourceRequest = createResourceRequest(
+        AllocationType.ANY,
+        resource = null,
+        numWorkers,
+        YarnAllocationHandler.PRIORITY)
+
+      val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
+        hostContainerRequests.size + rackContainerRequests.size + 1)
+
+      containerRequests ++= hostContainerRequests
+      containerRequests ++= rackContainerRequests
+      containerRequests += anyContainerRequests
+
+      resourceRequests = containerRequests.toList
+    }
+
+    val req = Records.newRecord(classOf[AllocateRequest])
+    req.setResponseId(lastResponseId.incrementAndGet)
+    req.setApplicationAttemptId(appAttemptId)
+
+    req.addAllAsks(resourceRequests)
+
+    val releasedContainerList = createReleasedContainerList()
+    req.addAllReleases(releasedContainerList)
+
+    if (numWorkers > 0) {
+      logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
+        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+    }
+    else {
+      logDebug("Empty allocation req ..  release : " + releasedContainerList)
+    }
+
+    for (request <- resourceRequests) {
+      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
+        format(
+          request.getHostName,
+          request.getNumContainers,
+          request.getPriority,
+          request.getCapability))
+    }
+    resourceManager.allocate(req)
+  }
+
+
+  private def createResourceRequest(
+    requestType: AllocationType.AllocationType, 
+    resource:String,
+    numWorkers: Int,
+    priority: Int): ResourceRequest = {
+
+    // If hostname specified, we need atleast two requests - node local and rack local.
+    // There must be a third request - which is ANY : that will be specially handled.
+    requestType match {
+      case AllocationType.HOST => {
+        assert(YarnAllocationHandler.ANY_HOST != resource)
+        val hostname = resource
+        val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+
+        // Add to host->rack mapping
+        YarnAllocationHandler.populateRackInfo(conf, hostname)
+
+        nodeLocal
+      }
+      case AllocationType.RACK => {
+        val rack = resource
+        createResourceRequestImpl(rack, numWorkers, priority)
+      }
+      case AllocationType.ANY => createResourceRequestImpl(
+        YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+      case _ => throw new IllegalArgumentException(
+        "Unexpected/unsupported request type: " + requestType)
+    }
+  }
+
+  private def createResourceRequestImpl(
+    hostname:String,
+    numWorkers: Int,
+    priority: Int): ResourceRequest = {
+
+    val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
+    val memCapability = Records.newRecord(classOf[Resource])
+    // There probably is some overhead here, let's reserve a bit more memory.
+    memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    rsrcRequest.setCapability(memCapability)
+
+    val pri = Records.newRecord(classOf[Priority])
+    pri.setPriority(priority)
+    rsrcRequest.setPriority(pri)
+
+    rsrcRequest.setHostName(hostname)
+
+    rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+    rsrcRequest
+  }
+
+  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
+
+    val retval = new ArrayBuffer[ContainerId](1)
+    // Iterator on COW list ...
+    for (container <- releasedContainerList.iterator()){
+      retval += container
+    }
+    // Remove from the original list.
+    if (! retval.isEmpty) {
+      releasedContainerList.removeAll(retval)
+      for (v <- retval) pendingReleaseContainers.put(v, true)
+      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + 
+        pendingReleaseContainers)
+    }
+
+    retval
+  }
+}
+
+object YarnAllocationHandler {
+
+  val ANY_HOST = "*"
+  // All requests are issued with same priority : we do not (yet) have any distinction between 
+  // request types (like map/reduce in hadoop for example)
+  val PRIORITY = 1
+
+  // Additional memory overhead - in mb
+  val MEMORY_OVERHEAD = 384
+
+  // Host to rack map - saved from allocation requests
+  // We are expecting this not to change.
+  // Note that it is possible for this to change : and RM will indicate that to us via update 
+  // response to allocate. But we are punting on handling that for now.
+  private val hostToRack = new ConcurrentHashMap[String, String]()
+  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    sparkConf: SparkConf): YarnAllocationHandler = {
+
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      args.numWorkers, 
+      args.workerMemory,
+      args.workerCores,
+      Map[String, Int](),
+      Map[String, Int](),
+      sparkConf)
+  }
+
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    map: collection.Map[String,
+    collection.Set[SplitInfo]],
+    sparkConf: SparkConf): YarnAllocationHandler = {
+
+    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      args.numWorkers, 
+      args.workerMemory,
+      args.workerCores,
+      hostToCount,
+      rackToCount,
+      sparkConf)
+  }
+
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    maxWorkers: Int,
+    workerMemory: Int,
+    workerCores: Int,
+    map: collection.Map[String, collection.Set[SplitInfo]],
+    sparkConf: SparkConf): YarnAllocationHandler = {
+
+    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
+
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      maxWorkers,
+      workerMemory,
+      workerCores,
+      hostToCount,
+      rackToCount,
+      sparkConf)
+  }
+
+  // A simple method to copy the split info map.
+  private def generateNodeToWeight(
+    conf: Configuration,
+    input: collection.Map[String, collection.Set[SplitInfo]]) :
+  // host to count, rack to count
+  (Map[String, Int], Map[String, Int]) = {
+
+    if (input == null) return (Map[String, Int](), Map[String, Int]())
+
+    val hostToCount = new HashMap[String, Int]
+    val rackToCount = new HashMap[String, Int]
+
+    for ((host, splits) <- input) {
+      val hostCount = hostToCount.getOrElse(host, 0)
+      hostToCount.put(host, hostCount + splits.size)
+
+      val rack = lookupRack(conf, host)
+      if (rack != null){
+        val rackCount = rackToCount.getOrElse(host, 0)
+        rackToCount.put(host, rackCount + splits.size)
+      }
+    }
+
+    (hostToCount.toMap, rackToCount.toMap)
+  }
+
+  def lookupRack(conf: Configuration, host: String): String = {
+    if (!hostToRack.contains(host)) populateRackInfo(conf, host)
+    hostToRack.get(host)
+  }
+
+  def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
+    val set = rackToHostSet.get(rack)
+    if (set == null) return None
+
+    // No better way to get a Set[String] from JSet ?
+    val convertedSet: collection.mutable.Set[String] = set
+    Some(convertedSet.toSet)
+  }
+
+  def populateRackInfo(conf: Configuration, hostname: String) {
+    Utils.checkHost(hostname)
+
+    if (!hostToRack.containsKey(hostname)) {
+      // If there are repeated failures to resolve, all to an ignore list ?
+      val rackInfo = RackResolver.resolve(conf, hostname)
+      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+        val rack = rackInfo.getNetworkLocation
+        hostToRack.put(hostname, rack)
+        if (! rackToHostSet.containsKey(rack)) {
+          rackToHostSet.putIfAbsent(rack,
+            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+        }
+        rackToHostSet.get(rack).add(hostname)
+
+        // TODO(harvey): Figure out this comment...
+        // Since RackResolver caches, we are disabling this for now ...
+      } /* else {
+        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+        hostToRack.put(hostname, null)
+      } */
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 3300a46..7c145ef 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -89,16 +89,16 @@
 
   <profiles>
     <profile>
-      <id>hadoop2-yarn</id>
+      <id>yarn-alpha</id>
       <modules>
-        <module>2.0</module>
+        <module>alpha</module>
       </modules>
     </profile>
 
     <profile>
-      <id>hadoop2.2-yarn</id>
+      <id>yarn</id>
       <modules>
-        <module>2.2</module>
+        <module>stable</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/stable/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
new file mode 100644
index 0000000..45a1431
--- /dev/null
+++ b/yarn/stable/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-yarn-aggregator_2.10</artifactId>
+    <version>0.9.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-yarn_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project YARN Stable API</name>
+  <url>http://spark.incubator.apache.org/</url>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+           <id>add-scala-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/main/scala</source>
+                <source>../common/src/main/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-scala-test-sources</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/test/scala</source>
+                <source>../common/src/test/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <exportAntProperties>true</exportAntProperties>
+              <tasks>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
+                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
+                  <condition>
+                    <not>
+                      <or>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
+                      </or>
+                    </not>
+                  </condition>
+                </fail>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <SPARK_HOME>${basedir}/../..</SPARK_HOME>
+            <SPARK_TESTING>1</SPARK_TESTING>
+            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+          </environmentVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ebdfa6bb/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..7c32e0a
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.util.Utils
+
+
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+  private var rpc: YarnRPC = YarnRPC.create(conf)
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  private var appAttemptId: ApplicationAttemptId = _
+  private var userThread: Thread = _
+  private val fs = FileSystem.get(yarnConf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var isFinished: Boolean = false
+  private var uiAddress: String = _
+  private val maxAppAttempts: Int = conf.getInt(
+    YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+  private var isLastAMRetry: Boolean = true
+  private var amClient: AMRMClient[ContainerRequest] = _
+
+  private val sparkConf = new SparkConf()
+  // Default to numWorkers * 2, with minimum of 3
+  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+    math.max(args.numWorkers * 2, 3))
+
+  def run() {
+    // Setup the directories so things go to YARN approved directories rather
+    // than user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    // set the web ui port to be ephemeral for yarn so we don't conflict with
+    // other spark processes running on the same box
+    System.setProperty("spark.ui.port", "0")
+
+    // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
+    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+
+    appAttemptId = getApplicationAttemptId()
+    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(yarnConf)
+    amClient.start()
+
+    // Workaround until hadoop moves to something which has
+    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+    // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+
+    ApplicationMaster.register(this)
+
+    // Start the user's JAR
+    userThread = startUserClass()
+
+    // This a bit hacky, but we need to wait until the spark.driver.port property has
+    // been set by the Thread executing the user class.
+    waitForSparkContextInitialized()
+
+    // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+    // Allocate all containers
+    allocateWorkers()
+
+    // Wait for the user class to Finish
+    userThread.join()
+
+    System.exit(0)
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+        .getOrElse(""))
+
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    localDirs
+  }
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+  }
+
+  private def startUserClass(): Thread  = {
+    logInfo("Starting the user JAR in a separate Thread")
+    val mainMethod = Class.forName(
+      args.userClass,
+      false /* initialize */,
+      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+    val t = new Thread {
+      override def run() {
+        var successed = false
+        try {
+          // Copy
+          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+          mainMethod.invoke(null, mainArgs)
+          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+          // userThread will stop here unless it has uncaught exception thrown out
+          // It need shutdown hook to set SUCCEEDED
+          successed = true
+        } finally {
+          logDebug("finishing main")
+          isLastAMRetry = true
+          if (successed) {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          } else {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+          }
+        }
+      }
+    }
+    t.start()
+    t
+  }
+
+  // This need to happen before allocateWorkers()
+  private def waitForSparkContextInitialized() {
+    logInfo("Waiting for Spark context initialization")
+    try {
+      var sparkContext: SparkContext = null
+      ApplicationMaster.sparkContextRef.synchronized {
+        var numTries = 0
+        val waitTime = 10000L
+        val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
+        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
+          logInfo("Waiting for Spark context initialization ... " + numTries)
+          numTries = numTries + 1
+          ApplicationMaster.sparkContextRef.wait(waitTime)
+        }
+        sparkContext = ApplicationMaster.sparkContextRef.get()
+        assert(sparkContext != null || numTries >= maxNumTries)
+
+        if (sparkContext != null) {
+          uiAddress = sparkContext.ui.appUIAddress
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            amClient,
+            appAttemptId,
+            args,
+            sparkContext.preferredNodeLocationData,
+            sparkContext.getConf)
+        } else {
+          logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
+            format(numTries * waitTime, maxNumTries))
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            amClient,
+            appAttemptId,
+            args,
+            sparkContext.getConf)
+        }
+      }
+    } finally {
+      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT :
+      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks.
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+  }
+
+  private def allocateWorkers() {
+    try {
+      logInfo("Allocating " + args.numWorkers + " workers.")
+      // Wait until all containers have finished
+      // TODO: This is a bit ugly. Can we make it nicer?
+      // TODO: Handle container failure
+      yarnAllocator.addResourceRequests(args.numWorkers)
+      // Exits the loop if the user thread exits.
+      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+          finishApplicationMaster(FinalApplicationStatus.FAILED,
+            "max number of worker failures reached")
+        }
+        yarnAllocator.allocateResources()
+        ApplicationMaster.incrementAllocatorLoop(1)
+        Thread.sleep(100)
+      }
+    } finally {
+      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+    logInfo("All workers have launched.")
+
+    // Launch a progress reporter thread, else the app will get killed after expiration
+    // (def: 10mins) timeout.
+    if (userThread.isAlive) {
+      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+      // we want to be reasonably responsive without causing too many requests to RM.
+      val schedulerInterval =
+        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+
+      // must be <= timeoutInterval / 2.
+      val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+      launchReporterThread(interval)
+    }
+  }
+
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (userThread.isAlive) {
+          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+            finishApplicationMaster(FinalApplicationStatus.FAILED,
+              "max number of worker failures reached")
+          }
+          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+            yarnAllocator.getNumPendingAllocate
+          if (missingWorkerCount > 0) {
+            logInfo("Allocating %d containers to make up for (potentially) lost containers".
+              format(missingWorkerCount))
+            yarnAllocator.addResourceRequests(missingWorkerCount)
+          }
+          sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // Setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // Simulated with an allocate request with no nodes requested.
+    yarnAllocator.allocateResources()
+  }
+
+  /*
+  def printContainers(containers: List[Container]) = {
+    for (container <- containers) {
+      logInfo("Launching shell command on a new container."
+        + ", containerId=" + container.getId()
+        + ", containerNode=" + container.getNodeId().getHost()
+        + ":" + container.getNodeId().getPort()
+        + ", containerNodeURI=" + container.getNodeHttpAddress()
+        + ", containerState" + container.getState()
+        + ", containerResourceMemory"
+        + container.getResource().getMemory())
+    }
+  }
+  */
+
+  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+    synchronized {
+      if (isFinished) {
+        return
+      }
+      isFinished = true
+    }
+
+    logInfo("finishApplicationMaster with " + status)
+    // Set tracking URL to empty since we don't have a history server.
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+  }
+
+  /**
+   * Clean up the staging directory.
+   */
+  private def cleanupStagingDir() {
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+    def run() {
+      logInfo("AppMaster received a signal.")
+      // we need to clean up staging dir before HDFS is shut down
+      // make sure we don't delete it until this is the last AM
+      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+    }
+  }
+}
+
+object ApplicationMaster {
+  // Number of times to wait for the allocator loop to complete.
+  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+  // This is to ensure that we have reasonable number of containers before we start
+  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+  // optimal as more containers are available. Might need to handle this better.
+  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+
+  private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+
+  val sparkContextRef: AtomicReference[SparkContext] =
+    new AtomicReference[SparkContext](null /* initialValue */)
+
+  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+
+  def incrementAllocatorLoop(by: Int) {
+    val count = yarnAllocatorLoop.getAndAdd(by)
+    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+      yarnAllocatorLoop.synchronized {
+        // to wake threads off wait ...
+        yarnAllocatorLoop.notifyAll()
+      }
+    }
+  }
+
+  def register(master: ApplicationMaster) {
+    applicationMasters.add(master)
+  }
+
+  // TODO(harvey): See whether this should be discarded - it isn't used anywhere atm...
+  def sparkContextInitialized(sc: SparkContext): Boolean = {
+    var modified = false
+    sparkContextRef.synchronized {
+      modified = sparkContextRef.compareAndSet(null, sc)
+      sparkContextRef.notifyAll()
+    }
+
+    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+    // System.exit.
+    // Should not really have to do this, but it helps YARN to evict resources earlier.
+    // Not to mention, prevent the Client from declaring failure even though we exited properly.
+    // Note that this will unfortunately not properly clean up the staging files because it gets
+    // called too late, after the filesystem is already shutdown.
+    if (modified) {
+      Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+        // This is not only logs, but also ensures that log system is initialized for this instance
+        // when we are actually 'run'-ing.
+        logInfo("Adding shutdown hook for context " + sc)
+        override def run() {
+          logInfo("Invoking sc stop from shutdown hook")
+          sc.stop()
+          // Best case ...
+          for (master <- applicationMasters) {
+            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          }
+        }
+      } )
+    }
+
+    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+    yarnAllocatorLoop.synchronized {
+      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+        yarnAllocatorLoop.wait(1000L)
+      }
+    }
+    modified
+  }
+
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new ApplicationMaster(args).run()
+  }
+}