You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/12/09 20:02:52 UTC

[2/6] spark git commit: SPARK-4338. [YARN] Ditch yarn-alpha.

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000..b32e157
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+
+object AllocationType extends Enumeration {
+  type AllocationType = Value
+  val HOST, RACK, ANY = Value
+}
+
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+
+/**
+ * Common code for the Yarn container allocator. Contains all the version-agnostic code to
+ * manage container allocation for a running Spark application.
+ */
+private[yarn] abstract class YarnAllocator(
+    conf: Configuration,
+    sparkConf: SparkConf,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+    securityMgr: SecurityManager)
+  extends Logging {
+
+  import YarnAllocator._
+
+  // These three are locked on allocatedHostToContainersMap. Complementary data structures
+  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+  // allocatedContainerToHostMap: container to host mapping.
+  private val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]()
+
+  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+
+  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+  // allocated node)
+  // As with the two data structures above, tightly coupled with them, and to be locked on
+  // allocatedHostToContainersMap
+  private val allocatedRackCount = new HashMap[String, Int]()
+
+  // Containers to be released in next request to RM
+  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+  // Number of container requests that have been sent to, but not yet allocated by the
+  // ApplicationMaster.
+  private val numPendingAllocate = new AtomicInteger()
+  private val numExecutorsRunning = new AtomicInteger()
+  // Used to generate a unique id per executor
+  private val executorIdCounter = new AtomicInteger()
+  private val numExecutorsFailed = new AtomicInteger()
+
+  private var maxExecutors = args.numExecutors
+
+  // Keep track of which container is running which executor to remove the executors later
+  private val executorIdToContainer = new HashMap[String, Container]
+
+  protected val executorMemory = args.executorMemory
+  protected val executorCores = args.executorCores
+  protected val (preferredHostToCount, preferredRackToCount) =
+    generateNodeToWeight(conf, preferredNodes)
+
+  // Additional memory overhead - in mb.
+  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+  private val launcherPool = new ThreadPoolExecutor(
+    // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+    1, TimeUnit.MINUTES,
+    new LinkedBlockingQueue[Runnable](),
+    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+  launcherPool.allowCoreThreadTimeOut(true)
+
+  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+
+  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+
+  /**
+   * Request as many executors from the ResourceManager as needed to reach the desired total.
+   * This takes into account executors already running or pending.
+   */
+  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+    if (requestedTotal > currentTotal) {
+      maxExecutors += (requestedTotal - currentTotal)
+      // We need to call `allocateResources` here to avoid the following race condition:
+      // If we request executors twice before `allocateResources` is called, then we will end up
+      // double counting the number requested because `numPendingAllocate` is not updated yet.
+      allocateResources()
+    } else {
+      logInfo(s"Not allocating more executors because there are already $currentTotal " +
+        s"(application requested $requestedTotal total)")
+    }
+  }
+
+  /**
+   * Request that the ResourceManager release the container running the specified executor.
+   */
+  def killExecutor(executorId: String): Unit = synchronized {
+    if (executorIdToContainer.contains(executorId)) {
+      val container = executorIdToContainer.remove(executorId).get
+      internalReleaseContainer(container)
+      numExecutorsRunning.decrementAndGet()
+      maxExecutors -= 1
+      assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+    } else {
+      logWarning(s"Attempted to kill unknown executor $executorId!")
+    }
+  }
+
+  /**
+   * Allocate missing containers based on the number of executors currently pending and running.
+   *
+   * This method prioritizes the allocated container responses from the RM based on node and
+   * rack locality. Additionally, it releases any extra containers allocated for this application
+   * but are not needed. This must be synchronized because variables read in this block are
+   * mutated by other methods.
+   */
+  def allocateResources(): Unit = synchronized {
+    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+
+    // this is needed by alpha, do it here since we add numPending right after this
+    val executorsPending = numPendingAllocate.get()
+    if (missing > 0) {
+      val totalExecutorMemory = executorMemory + memoryOverhead
+      numPendingAllocate.addAndGet(missing)
+      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+        s"memory including $memoryOverhead MB overhead")
+    } else {
+      logDebug("Empty allocation request ...")
+    }
+
+    val allocateResponse = allocateContainers(missing, executorsPending)
+    val allocatedContainers = allocateResponse.getAllocatedContainers()
+
+    if (allocatedContainers.size > 0) {
+      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
+
+      if (numPendingAllocateNow < 0) {
+        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
+      }
+
+      logDebug("""
+        Allocated containers: %d
+        Current executor count: %d
+        Containers released: %s
+        Cluster resources: %s
+        """.format(
+          allocatedContainers.size,
+          numExecutorsRunning.get(),
+          releasedContainers,
+          allocateResponse.getAvailableResources))
+
+      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      for (container <- allocatedContainers) {
+        if (isResourceConstraintSatisfied(container)) {
+          // Add the accepted `container` to the host's list of already accepted,
+          // allocated containers
+          val host = container.getNodeId.getHost
+          val containersForHost = hostToContainers.getOrElseUpdate(host,
+            new ArrayBuffer[Container]())
+          containersForHost += container
+        } else {
+          // Release container, since it doesn't satisfy resource constraints.
+          internalReleaseContainer(container)
+        }
+      }
+
+       // Find the appropriate containers to use.
+      // TODO: Cleanup this group-by...
+      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      for (candidateHost <- hostToContainers.keySet) {
+        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+        val remainingContainersOpt = hostToContainers.get(candidateHost)
+        assert(remainingContainersOpt.isDefined)
+        var remainingContainers = remainingContainersOpt.get
+
+        if (requiredHostCount >= remainingContainers.size) {
+          // Since we have <= required containers, add all remaining containers to
+          // `dataLocalContainers`.
+          dataLocalContainers.put(candidateHost, remainingContainers)
+          // There are no more free containers remaining.
+          remainingContainers = null
+        } else if (requiredHostCount > 0) {
+          // Container list has more containers than we need for data locality.
+          // Split the list into two: one based on the data local container count,
+          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+          // containers.
+          val (dataLocal, remaining) = remainingContainers.splitAt(
+            remainingContainers.size - requiredHostCount)
+          dataLocalContainers.put(candidateHost, dataLocal)
+
+          // Invariant: remainingContainers == remaining
+
+          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
+          // Add each container in `remaining` to list of containers to release. If we have an
+          // insufficient number of containers, then the next allocation cycle will reallocate
+          // (but won't treat it as data local).
+          // TODO(harvey): Rephrase this comment some more.
+          for (container <- remaining) internalReleaseContainer(container)
+          remainingContainers = null
+        }
+
+        // For rack local containers
+        if (remainingContainers != null) {
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+          if (rack != null) {
+            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+              rackLocalContainers.getOrElse(rack, List()).size
+
+            if (requiredRackCount >= remainingContainers.size) {
+              // Add all remaining containers to to `dataLocalContainers`.
+              dataLocalContainers.put(rack, remainingContainers)
+              remainingContainers = null
+            } else if (requiredRackCount > 0) {
+              // Container list has more containers that we need for data locality.
+              // Split the list into two: one based on the data local container count,
+              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+              // containers.
+              val (rackLocal, remaining) = remainingContainers.splitAt(
+                remainingContainers.size - requiredRackCount)
+              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                new ArrayBuffer[Container]())
+
+              existingRackLocal ++= rackLocal
+
+              remainingContainers = remaining
+            }
+          }
+        }
+
+        if (remainingContainers != null) {
+          // Not all containers have been consumed - add them to the list of off-rack containers.
+          offRackContainers.put(candidateHost, remainingContainers)
+        }
+      }
+
+      // Now that we have split the containers into various groups, go through them in order:
+      // first host-local, then rack-local, and finally off-rack.
+      // Note that the list we create below tries to ensure that not all containers end up within
+      // a host if there is a sufficiently large number of hosts/containers.
+      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+
+      // Run each of the allocated containers.
+      for (container <- allocatedContainersToProcess) {
+        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+        val executorHostname = container.getNodeId.getHost
+        val containerId = container.getId
+
+        val executorMemoryOverhead = (executorMemory + memoryOverhead)
+        assert(container.getResource.getMemory >= executorMemoryOverhead)
+
+        if (numExecutorsRunningNow > maxExecutors) {
+          logInfo("""Ignoring container %s at host %s, since we already have the required number of
+            containers for it.""".format(containerId, executorHostname))
+          internalReleaseContainer(container)
+          numExecutorsRunning.decrementAndGet()
+        } else {
+          val executorId = executorIdCounter.incrementAndGet().toString
+          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+            SparkEnv.driverActorSystemName,
+            sparkConf.get("spark.driver.host"),
+            sparkConf.get("spark.driver.port"),
+            CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+          executorIdToContainer(executorId) = container
+
+          // To be safe, remove the container from `releasedContainers`.
+          releasedContainers.remove(containerId)
+
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
+          allocatedHostToContainersMap.synchronized {
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+              new HashSet[ContainerId]())
+
+            containerSet += containerId
+            allocatedContainerToHostMap.put(containerId, executorHostname)
+
+            if (rack != null) {
+              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+            }
+          }
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
+            driverUrl, executorHostname))
+          val executorRunnable = new ExecutorRunnable(
+            container,
+            conf,
+            sparkConf,
+            driverUrl,
+            executorId,
+            executorHostname,
+            executorMemory,
+            executorCores,
+            appAttemptId.getApplicationId.toString,
+            securityMgr)
+          launcherPool.execute(executorRunnable)
+        }
+      }
+      logDebug("""
+        Finished allocating %s containers (from %s originally).
+        Current number of executors running: %d,
+        Released containers: %s
+        """.format(
+          allocatedContainersToProcess,
+          allocatedContainers,
+          numExecutorsRunning.get(),
+          releasedContainers))
+    }
+
+    val completedContainers = allocateResponse.getCompletedContainersStatuses()
+    if (completedContainers.size > 0) {
+      logDebug("Completed %d containers".format(completedContainers.size))
+
+      for (completedContainer <- completedContainers) {
+        val containerId = completedContainer.getContainerId
+
+        if (releasedContainers.containsKey(containerId)) {
+          // YarnAllocationHandler already marked the container for release, so remove it from
+          // `releasedContainers`.
+          releasedContainers.remove(containerId)
+        } else {
+          // Decrement the number of executors running. The next iteration of
+          // the ApplicationMaster's reporting thread will take care of allocating.
+          numExecutorsRunning.decrementAndGet()
+          logInfo("Completed container %s (state: %s, exit status: %s)".format(
+            containerId,
+            completedContainer.getState,
+            completedContainer.getExitStatus))
+          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+          // there are some exit status' we shouldn't necessarily count against us, but for
+          // now I think its ok as none of the containers are expected to exit
+          if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+            logWarning(memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              VMEM_EXCEEDED_PATTERN))
+          } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
+            logWarning(memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              PMEM_EXCEEDED_PATTERN))
+          } else if (completedContainer.getExitStatus != 0) {
+            logInfo("Container marked as failed: " + containerId +
+              ". Exit status: " + completedContainer.getExitStatus +
+              ". Diagnostics: " + completedContainer.getDiagnostics)
+            numExecutorsFailed.incrementAndGet()
+          }
+        }
+
+        allocatedHostToContainersMap.synchronized {
+          if (allocatedContainerToHostMap.containsKey(containerId)) {
+            val hostOpt = allocatedContainerToHostMap.get(containerId)
+            assert(hostOpt.isDefined)
+            val host = hostOpt.get
+
+            val containerSetOpt = allocatedHostToContainersMap.get(host)
+            assert(containerSetOpt.isDefined)
+            val containerSet = containerSetOpt.get
+
+            containerSet.remove(containerId)
+            if (containerSet.isEmpty) {
+              allocatedHostToContainersMap.remove(host)
+            } else {
+              allocatedHostToContainersMap.update(host, containerSet)
+            }
+
+            allocatedContainerToHostMap.remove(containerId)
+
+            // TODO: Move this part outside the synchronized block?
+            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+            if (rack != null) {
+              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+              if (rackCount > 0) {
+                allocatedRackCount.put(rack, rackCount)
+              } else {
+                allocatedRackCount.remove(rack)
+              }
+            }
+          }
+        }
+      }
+      logDebug("""
+        Finished processing %d completed containers.
+        Current number of executors running: %d,
+        Released containers: %s
+        """.format(
+          completedContainers.size,
+          numExecutorsRunning.get(),
+          releasedContainers))
+    }
+  }
+
+  protected def allocatedContainersOnHost(host: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+    }
+    retval
+  }
+
+  protected def allocatedContainersOnRack(rack: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedRackCount.getOrElse(rack, 0)
+    }
+    retval
+  }
+
+  private def isResourceConstraintSatisfied(container: Container): Boolean = {
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
+  }
+
+  // A simple method to copy the split info map.
+  private def generateNodeToWeight(
+      conf: Configuration,
+      input: collection.Map[String, collection.Set[SplitInfo]]
+    ): (Map[String, Int], Map[String, Int]) = {
+
+    if (input == null) {
+      return (Map[String, Int](), Map[String, Int]())
+    }
+
+    val hostToCount = new HashMap[String, Int]
+    val rackToCount = new HashMap[String, Int]
+
+    for ((host, splits) <- input) {
+      val hostCount = hostToCount.getOrElse(host, 0)
+      hostToCount.put(host, hostCount + splits.size)
+
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+      if (rack != null) {
+        val rackCount = rackToCount.getOrElse(host, 0)
+        rackToCount.put(host, rackCount + splits.size)
+      }
+    }
+
+    (hostToCount.toMap, rackToCount.toMap)
+  }
+
+  private def internalReleaseContainer(container: Container) = {
+    releasedContainers.put(container.getId(), true)
+    releaseContainer(container)
+  }
+
+  /**
+   * Called to allocate containers in the cluster.
+   *
+   * @param count Number of containers to allocate.
+   *              If zero, should still contact RM (as a heartbeat).
+   * @param pending Number of containers pending allocate. Only used on alpha.
+   * @return Response to the allocation request.
+   */
+  protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
+
+  /** Called to release a previously allocated container. */
+  protected def releaseContainer(container: Container): Unit
+
+  /**
+   * Defines the interface for an allocate response from the RM. This is needed since the alpha
+   * and stable interfaces differ here in ways that cannot be fixed using other routes.
+   */
+  protected trait YarnAllocateResponse {
+
+    def getAllocatedContainers(): JList[Container]
+
+    def getAvailableResources(): Resource
+
+    def getCompletedContainersStatuses(): JList[ContainerStatus]
+
+  }
+
+}
+
+private object YarnAllocator {
+  val MEM_REGEX = "[0-9.]+ [KMG]B"
+  val PMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
+  val VMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+
+  def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
+    val matcher = pattern.matcher(diagnostics)
+    val diag = if (matcher.find()) " " + matcher.group() + "." else ""
+    ("Container killed by YARN for exceeding memory limits." + diag
+      + " Consider boosting spark.yarn.executor.memoryOverhead.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000..2510b9c
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.scheduler.SplitInfo
+
+/**
+ * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
+ * is used by Spark's AM.
+ */
+trait YarnRMClient {
+
+  /**
+   * Registers the application master with the RM.
+   *
+   * @param conf The Yarn configuration.
+   * @param sparkConf The Spark configuration.
+   * @param preferredNodeLocations Map with hints about where to allocate containers.
+   * @param uiAddress Address of the SparkUI.
+   * @param uiHistoryAddress Address of the application on the History Server.
+   */
+  def register(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      preferredNodeLocations: Map[String, Set[SplitInfo]],
+      uiAddress: String,
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager): YarnAllocator
+
+  /**
+   * Unregister the AM. Guaranteed to only be called once.
+   *
+   * @param status The final status of the AM.
+   * @param diagnostics Diagnostics message to include in the final status.
+   */
+  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+
+  /** Returns the attempt ID. */
+  def getAttemptId(): ApplicationAttemptId
+
+  /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
+
+  /** Returns the maximum number of attempts to register the AM. */
+  def getMaxRegAttempts(conf: YarnConfiguration): Int
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000..8d4b96e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
+import scala.util._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+
+/**
+ * YarnRMClient implementation for the Yarn stable API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+  private var uiHistoryAddress: String = _
+  private var registered: Boolean = false
+
+  override def register(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      preferredNodeLocations: Map[String, Set[SplitInfo]],
+      uiAddress: String,
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager) = {
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(conf)
+    amClient.start()
+    this.uiHistoryAddress = uiHistoryAddress
+
+    logInfo("Registering the ApplicationMaster")
+    synchronized {
+      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+      registered = true
+    }
+    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
+      preferredNodeLocations, securityMgr)
+  }
+
+  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+    if (registered) {
+      amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+    }
+  }
+
+  override def getAttemptId() = {
+    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    appAttemptId
+  }
+
+  override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+    // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+    // so not all stable releases have it.
+    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+        .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+    // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+    try {
+      val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+        classOf[Configuration])
+      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+      val hosts = proxies.map { proxy => proxy.split(":")(0) }
+      val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+    } catch {
+      case e: NoSuchMethodException =>
+        val proxy = WebAppUtils.getProxyHostAndPort(conf)
+        val parts = proxy.split(":")
+        val uriBase = prefix + proxy + proxyBase
+        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+    }
+  }
+
+  override def getMaxRegAttempts(conf: YarnConfiguration) =
+    conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
new file mode 100644
index 0000000..d7cf904
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Boolean => JBoolean}
+import java.io.File
+import java.util.{Collections, Set => JSet}
+import java.util.regex.Matcher
+import java.util.regex.Pattern
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.util.RackResolver
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+class YarnSparkHadoopUtil extends SparkHadoopUtil {
+
+  override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+    dest.addCredentials(source.getCredentials())
+  }
+
+  // Note that all params which start with SPARK are propagated all the way through, so if in yarn
+  // mode, this MUST be set to true.
+  override def isYarnMode(): Boolean = { true }
+
+  // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
+  // subsystems. Always create a new config, dont reuse yarnConf.
+  override def newConfiguration(conf: SparkConf): Configuration =
+    new YarnConfiguration(super.newConfiguration(conf))
+
+  // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+  // cluster
+  override def addCredentials(conf: JobConf) {
+    val jobCreds = conf.getCredentials()
+    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+  }
+
+  override def getCurrentUserCredentials(): Credentials = {
+    UserGroupInformation.getCurrentUser().getCredentials()
+  }
+
+  override def addCurrentUserCredentials(creds: Credentials) {
+    UserGroupInformation.getCurrentUser().addCredentials(creds)
+  }
+
+  override def addSecretKeyToUserCredentials(key: String, secret: String) {
+    val creds = new Credentials()
+    creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
+    addCurrentUserCredentials(creds)
+  }
+
+  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+    val credentials = getCurrentUserCredentials()
+    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+  }
+
+}
+
+object YarnSparkHadoopUtil {
+  // Additional memory overhead 
+  // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+  // the common cases. Memory overhead tends to grow with container size. 
+
+  val MEMORY_OVERHEAD_FACTOR = 0.07
+  val MEMORY_OVERHEAD_MIN = 384
+
+  val ANY_HOST = "*"
+
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
+  // All RM requests are issued with same priority : we do not (yet) have any distinction between
+  // request types (like map/reduce in hadoop for example)
+  val RM_REQUEST_PRIORITY = 1
+
+  // Host to rack map - saved from allocation requests. We are expecting this not to change.
+  // Note that it is possible for this to change : and ResourceManager will indicate that to us via
+  // update response to allocate. But we are punting on handling that for now.
+  private val hostToRack = new ConcurrentHashMap[String, String]()
+  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+  /**
+   * Add a path variable to the given environment map.
+   * If the map already contains this key, append the value to the existing value instead.
+   */
+  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+    val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+    env.put(key, newValue)
+  }
+
+  /**
+   * Set zero or more environment variables specified by the given input string.
+   * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+   */
+  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
+    if (inputString != null && inputString.length() > 0) {
+      val childEnvs = inputString.split(",")
+      val p = Pattern.compile(environmentVariableRegex)
+      for (cEnv <- childEnvs) {
+        val parts = cEnv.split("=") // split on '='
+        val m = p.matcher(parts(1))
+        val sb = new StringBuffer
+        while (m.find()) {
+          val variable = m.group(1)
+          var replace = ""
+          if (env.get(variable) != None) {
+            replace = env.get(variable).get
+          } else {
+            // if this key is not configured for the child .. get it from the env
+            replace = System.getenv(variable)
+            if (replace == null) {
+            // the env key is note present anywhere .. simply set it
+              replace = ""
+            }
+          }
+          m.appendReplacement(sb, Matcher.quoteReplacement(replace))
+        }
+        m.appendTail(sb)
+        // This treats the environment variable as path variable delimited by `File.pathSeparator`
+        // This is kept for backward compatibility and consistency with Hadoop's behavior
+        addPathToEnvironment(env, parts(0), sb.toString)
+      }
+    }
+  }
+
+  private val environmentVariableRegex: String = {
+    if (Utils.isWindows) {
+      "%([A-Za-z_][A-Za-z0-9_]*?)%"
+    } else {
+      "\\$([A-Za-z_][A-Za-z0-9_]*)"
+    }
+  }
+
+  /**
+   * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
+   * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
+   * argument is enclosed in single quotes and some key characters are escaped.
+   *
+   * @param arg A single argument.
+   * @return Argument quoted for execution via Yarn's generated shell script.
+   */
+  def escapeForShell(arg: String): String = {
+    if (arg != null) {
+      val escaped = new StringBuilder("'")
+      for (i <- 0 to arg.length() - 1) {
+        arg.charAt(i) match {
+          case '$' => escaped.append("\\$")
+          case '"' => escaped.append("\\\"")
+          case '\'' => escaped.append("'\\''")
+          case c => escaped.append(c)
+        }
+      }
+      escaped.append("'").toString()
+    } else {
+      arg
+    }
+  }
+
+  def lookupRack(conf: Configuration, host: String): String = {
+    if (!hostToRack.contains(host)) {
+      populateRackInfo(conf, host)
+    }
+    hostToRack.get(host)
+  }
+
+  def populateRackInfo(conf: Configuration, hostname: String) {
+    Utils.checkHost(hostname)
+
+    if (!hostToRack.containsKey(hostname)) {
+      // If there are repeated failures to resolve, all to an ignore list.
+      val rackInfo = RackResolver.resolve(conf, hostname)
+      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+        val rack = rackInfo.getNetworkLocation
+        hostToRack.put(hostname, rack)
+        if (! rackToHostSet.containsKey(rack)) {
+          rackToHostSet.putIfAbsent(rack,
+            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+        }
+        rackToHostSet.get(rack).add(hostname)
+
+        // TODO(harvey): Figure out what this comment means...
+        // Since RackResolver caches, we are disabling this for now ...
+      } /* else {
+        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+        hostToRack.put(hostname, null)
+      } */
+    }
+  }
+
+  def getApplicationAclsForYarn(securityMgr: SecurityManager)
+      : Map[ApplicationAccessType, String] = {
+    Map[ApplicationAccessType, String] (
+      ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
+      ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
new file mode 100644
index 0000000..254774a
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+/**
+ * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
+ */
+private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000..2923e67
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+private[spark] class YarnClientSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends YarnSchedulerBackend(scheduler, sc)
+  with Logging {
+
+  private var client: Client = null
+  private var appId: ApplicationId = null
+  @volatile private var stopping: Boolean = false
+
+  /**
+   * Create a Yarn client to submit an application to the ResourceManager.
+   * This waits until the application is running.
+   */
+  override def start() {
+    super.start()
+    val driverHost = conf.get("spark.driver.host")
+    val driverPort = conf.get("spark.driver.port")
+    val hostport = driverHost + ":" + driverPort
+    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }
+
+    val argsArrayBuf = new ArrayBuffer[String]()
+    argsArrayBuf += ("--arg", hostport)
+    argsArrayBuf ++= getExtraClientArguments
+
+    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
+    val args = new ClientArguments(argsArrayBuf.toArray, conf)
+    totalExpectedExecutors = args.numExecutors
+    client = new Client(args, conf)
+    appId = client.submitApplication()
+    waitForApplication()
+    asyncMonitorApplication()
+  }
+
+  /**
+   * Return any extra command line arguments to be passed to Client provided in the form of
+   * environment variables or Spark properties.
+   */
+  private def getExtraClientArguments: Seq[String] = {
+    val extraArgs = new ArrayBuffer[String]
+    val optionTuples = // List of (target Client argument, environment variable, Spark property)
+      List(
+        ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+        ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+        ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
+        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+        ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+        ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+        ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
+      )
+    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+      if (System.getenv(envVar) != null) {
+        extraArgs += (optionName, System.getenv(envVar))
+      } else if (sc.getConf.contains(sparkProp)) {
+        extraArgs += (optionName, sc.getConf.get(sparkProp))
+      }
+    }
+    extraArgs
+  }
+
+  /**
+   * Report the state of the application until it is running.
+   * If the application has finished, failed or been killed in the process, throw an exception.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def waitForApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been submitted yet!")
+    val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
+    if (state == YarnApplicationState.FINISHED ||
+      state == YarnApplicationState.FAILED ||
+      state == YarnApplicationState.KILLED) {
+      throw new SparkException("Yarn application has already ended! " +
+        "It might have been killed or unable to launch application master.")
+    }
+    if (state == YarnApplicationState.RUNNING) {
+      logInfo(s"Application $appId has started running.")
+    }
+  }
+
+  /**
+   * Monitor the application state in a separate thread.
+   * If the application has exited for any reason, stop the SparkContext.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def asyncMonitorApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been submitted yet!")
+    val t = new Thread {
+      override def run() {
+        while (!stopping) {
+          val report = client.getApplicationReport(appId)
+          val state = report.getYarnApplicationState()
+          if (state == YarnApplicationState.FINISHED ||
+            state == YarnApplicationState.KILLED ||
+            state == YarnApplicationState.FAILED) {
+            logError(s"Yarn application has already exited with state $state!")
+            sc.stop()
+            stopping = true
+          }
+          Thread.sleep(1000L)
+        }
+        Thread.currentThread().interrupt()
+      }
+    }
+    t.setName("Yarn application state monitor")
+    t.setDaemon(true)
+    t.start()
+  }
+
+  /**
+   * Stop the scheduler. This assumes `start()` has already been called.
+   */
+  override def stop() {
+    assert(client != null, "Attempted to stop this scheduler before starting it!")
+    stopping = true
+    super.stop()
+    client.stop()
+    logInfo("Stopped")
+  }
+
+  override def applicationId(): String = {
+    Option(appId).map(_.toString).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
new file mode 100644
index 0000000..4157ff9
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+/**
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc is done
+ */
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+
+  logInfo("Created YarnClusterScheduler")
+
+  // Nothing else for now ... initialize application master : which needs a SparkContext to
+  // determine how to allocate.
+  // Note that only the first creation of a SparkContext influences (and ideally, there must be
+  // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
+  // allocated by then.
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+  }
+
+  override def postStartHook() {
+    ApplicationMaster.sparkContextInitialized(sc)
+    super.postStartHook()
+    logInfo("YarnClusterScheduler.postStartHook done")
+  }
+
+  override def stop() {
+    super.stop()
+    ApplicationMaster.sparkContextStopped(sc)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000..b1de81e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends YarnSchedulerBackend(scheduler, sc) {
+
+  override def start() {
+    super.start()
+    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
+    if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+      totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+        .getOrElse(totalExpectedExecutors)
+    }
+    // System property can override environment variable.
+    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+  }
+
+  override def applicationId(): String =
+    // In YARN Cluster mode, spark.yarn.app.id is expect to be set
+    // before user application is launched.
+    // So, if spark.yarn.app.id is not set, it is something wrong.
+    sc.getConf.getOption("spark.yarn.app.id").getOrElse {
+      logError("Application ID is not set.")
+      super.applicationId
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9dd05f1
--- /dev/null
+++ b/yarn/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
new file mode 100644
index 0000000..17b79ae
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.MRJobConfig
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import org.apache.spark.{SparkException, SparkConf}
+import org.apache.spark.util.Utils
+
+class ClientBaseSuite extends FunSuite with Matchers {
+
+  test("default Yarn application classpath") {
+    ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+  }
+
+  test("default MR application classpath") {
+    ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+  }
+
+  test("resultant classpath for an application that defines a classpath for YARN") {
+    withAppConf(Fixtures.mapYARNAppConf) { conf =>
+      val env = newEnv
+      ClientBase.populateHadoopClasspath(conf, env)
+      classpath(env) should be(
+        flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
+    }
+  }
+
+  test("resultant classpath for an application that defines a classpath for MR") {
+    withAppConf(Fixtures.mapMRAppConf) { conf =>
+      val env = newEnv
+      ClientBase.populateHadoopClasspath(conf, env)
+      classpath(env) should be(
+        flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+    }
+  }
+
+  test("resultant classpath for an application that defines both classpaths, YARN and MR") {
+    withAppConf(Fixtures.mapAppConf) { conf =>
+      val env = newEnv
+      ClientBase.populateHadoopClasspath(conf, env)
+      classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
+    }
+  }
+
+  private val SPARK = "local:/sparkJar"
+  private val USER = "local:/userJar"
+  private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
+
+  test("Local jar URIs") {
+    val conf = new Configuration()
+    val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
+    val env = new MutableHashMap[String, String]()
+    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+    ClientBase.populateClasspath(args, conf, sparkConf, env)
+
+    val cp = env("CLASSPATH").split(File.pathSeparator)
+    s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
+      val uri = new URI(entry)
+      if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
+        cp should contain (uri.getPath())
+      } else {
+        cp should not contain (uri.getPath())
+      }
+    })
+    cp should contain (Environment.PWD.$())
+    cp should contain (s"${Environment.PWD.$()}${File.separator}*")
+    cp should not contain (ClientBase.SPARK_JAR)
+    cp should not contain (ClientBase.APP_JAR)
+  }
+
+  test("Jar path propagation through SparkConf") {
+    val conf = new Configuration()
+    val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
+    val yarnConf = new YarnConfiguration()
+    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+    val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+      any(classOf[Path]), anyShort(), anyBoolean())
+
+    val tempDir = Utils.createTempDir()
+    try {
+      client.prepareLocalResources(tempDir.getAbsolutePath())
+      sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
+
+      // The non-local path should be propagated by name only, since it will end up in the app's
+      // staging dir.
+      val expected = ADDED.split(",")
+        .map(p => {
+          val uri = new URI(p)
+          if (ClientBase.LOCAL_SCHEME == uri.getScheme()) {
+            p
+          } else {
+            Option(uri.getFragment()).getOrElse(new File(p).getName())
+          }
+        })
+        .mkString(",")
+
+      sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
+  test("check access nns empty") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns unset") {
+    val sparkConf = new SparkConf()
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access nns space") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access two nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+    val renewer = ClientBase.getTokenRenewer(hadoopConf)
+    renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        ClientBase.getTokenRenewer(hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+  }
+
+  object Fixtures {
+
+    val knownDefYarnAppCP: Seq[String] =
+      getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
+                                                "DEFAULT_YARN_APPLICATION_CLASSPATH",
+                                                Seq[String]())(a => a.toSeq)
+
+
+    val knownDefMRAppCP: Seq[String] =
+      getFieldValue2[String, Array[String], Seq[String]](
+        classOf[MRJobConfig],
+        "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
+        Seq[String]())(a => a.split(","))(a => a.toSeq)
+
+    val knownYARNAppCP = Some(Seq("/known/yarn/path"))
+
+    val knownMRAppCP = Some(Seq("/known/mr/path"))
+
+    val mapMRAppConf =
+      Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
+
+    val mapYARNAppConf =
+      Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
+
+    val mapAppConf = mapYARNAppConf ++ mapMRAppConf
+  }
+
+  def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
+    val conf = new Configuration
+    m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
+    testCode(conf)
+  }
+
+  def newEnv = MutableHashMap[String, String]()
+
+  def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
+
+  def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
+
+  def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
+    Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
+
+  def getFieldValue2[A: ClassTag, A1: ClassTag, B](
+        clazz: Class[_],
+        field: String,
+        defaults: => B)(mapTo:  A => B)(mapTo1: A1 => B) : B = {
+    Try(clazz.getField(field)).map(_.get(null)).map {
+      case v: A => mapTo(v)
+      case v1: A1 => mapTo1(v1)
+      case _ => defaults
+    }.toOption.getOrElse(defaults)
+  }
+
+  private class DummyClient(
+      val args: ClientArguments,
+      val hadoopConf: Configuration,
+      val sparkConf: SparkConf,
+      val yarnConf: YarnConfiguration) extends ClientBase {
+    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ???
+    override def submitApplication(): ApplicationId = ???
+    override def getApplicationReport(appId: ApplicationId): ApplicationReport = ???
+    override def getClientToken(report: ApplicationReport): String = ???
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
new file mode 100644
index 0000000..80b57d1
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito.when
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+
+class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
+
+  class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
+    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): 
+        LocalResourceVisibility = {
+      LocalResourceVisibility.PRIVATE
+    }
+  }
+  
+  test("test getFileStatus empty") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath() === null)
+  }
+
+  test("test getFileStatus cached") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath().toString() === "/tmp/testing")
+  }
+
+  test("test addResource") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 0)
+    assert(resource.getSize() === 0)
+    assert(resource.getType() === LocalResourceType.FILE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+
+    // add another one and verify both there and order correct
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing2"))
+    val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
+    when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
+    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", 
+      statCache, false)
+    val resource2 = localResources("link2")
+    assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
+    assert(resource2.getTimestamp() === 10)
+    assert(resource2.getSize() === 20)
+    assert(resource2.getType() === LocalResourceType.FILE)
+
+    val env2 = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env2)
+    val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+    val files = env2("SPARK_YARN_CACHE_FILES").split(',') 
+    val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+    val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+    assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(timestamps(0)  === "0")
+    assert(sizes(0)  === "0")
+    assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
+
+    assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
+    assert(timestamps(1)  === "10")
+    assert(sizes(1)  === "20")
+    assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
+  }
+
+  test("test addResource link null") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    intercept[Exception] {
+      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, 
+        statCache, false)
+    }
+    assert(localResources.get("link") === None)
+    assert(localResources.size === 0)
+  }
+
+  test("test addResource appmaster only") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, true)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+  }
+
+  test("test addResource archive") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
new file mode 100644
index 0000000..8d184a0
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.deploy.yarn.YarnAllocator._
+import org.scalatest.FunSuite
+
+class YarnAllocatorSuite extends FunSuite {
+  test("memory exceeded diagnostic regexes") {
+    val diagnostics =
+      "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
+      "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
+      "5.8 GB of 4.2 GB virtual memory used. Killing container."
+    val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
+    val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
+    assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
+    assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
new file mode 100644
index 0000000..d79b85e
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
+
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
+
+  // log4j configuration for the Yarn containers, so that their output is collected
+  // by Yarn instead of trying to overwrite unit-tests.log.
+  private val LOG4J_CONF = """
+    |log4j.rootCategory=DEBUG, console
+    |log4j.appender.console=org.apache.log4j.ConsoleAppender
+    |log4j.appender.console.target=System.err
+    |log4j.appender.console.layout=org.apache.log4j.PatternLayout
+    |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+    """.stripMargin
+
+  private var yarnCluster: MiniYARNCluster = _
+  private var tempDir: File = _
+  private var fakeSparkJar: File = _
+  private var oldConf: Map[String, String] = _
+
+  override def beforeAll() {
+    tempDir = Utils.createTempDir()
+
+    val logConfDir = new File(tempDir, "log4j")
+    logConfDir.mkdir()
+
+    val logConfFile = new File(logConfDir, "log4j.properties")
+    Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
+
+    val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
+      sys.props("java.class.path")
+
+    oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+
+    yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
+    yarnCluster.init(new YarnConfiguration())
+    yarnCluster.start()
+
+    // There's a race in MiniYARNCluster in which start() may return before the RM has updated
+    // its address in the configuration. You can see this in the logs by noticing that when
+    // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
+    // test works sometimes:
+    //
+    //    INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
+    //
+    // That log message prints the contents of the RM_ADDRESS config variable. If you check it
+    // later on, it looks something like this:
+    //
+    //    INFO YarnClusterSuite: RM address in configuration is blah:42631
+    //
+    // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
+    // done so in a timely manner (defined to be 10 seconds).
+    val config = yarnCluster.getConfig()
+    val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+    while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+      if (System.currentTimeMillis() > deadline) {
+        throw new IllegalStateException("Timed out waiting for RM to come up.")
+      }
+      logDebug("RM address still not set in configuration, waiting...")
+      TimeUnit.MILLISECONDS.sleep(100)
+    }
+
+    logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
+    config.foreach { e =>
+      sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
+    }
+
+    fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+    sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
+    sys.props += ("spark.executor.instances" -> "1")
+    sys.props += ("spark.driver.extraClassPath" -> childClasspath)
+    sys.props += ("spark.executor.extraClassPath" -> childClasspath)
+
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    yarnCluster.stop()
+    sys.props.retain { case (k, v) => !k.startsWith("spark.") }
+    sys.props ++= oldConf
+    super.afterAll()
+  }
+
+  test("run Spark in yarn-client mode") {
+    var result = File.createTempFile("result", null, tempDir)
+    YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
+    checkResult(result)
+  }
+
+  test("run Spark in yarn-cluster mode") {
+    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+    var result = File.createTempFile("result", null, tempDir)
+
+    val args = Array("--class", main,
+      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--arg", result.getAbsolutePath(),
+      "--num-executors", "1")
+    Client.main(args)
+    checkResult(result)
+  }
+
+  test("run Spark in yarn-cluster mode unsuccessfully") {
+    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+    // Use only one argument so the driver will fail
+    val args = Array("--class", main,
+      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--num-executors", "1")
+    val exception = intercept[SparkException] {
+      Client.main(args)
+    }
+    assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
+  }
+
+  /**
+   * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
+   * any sort of error when the job process finishes successfully, but the job itself fails. So
+   * the tests enforce that something is written to a file after everything is ok to indicate
+   * that the job succeeded.
+   */
+  private def checkResult(result: File) = {
+    var resultString = Files.toString(result, Charsets.UTF_8)
+    resultString should be ("success")
+  }
+
+}
+
+private object YarnClusterDriver extends Logging with Matchers {
+
+  def main(args: Array[String]) = {
+    if (args.length != 2) {
+      System.err.println(
+        s"""
+        |Invalid command line: ${args.mkString(" ")}
+        |
+        |Usage: YarnClusterDriver [master] [result file]
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    val sc = new SparkContext(new SparkConf().setMaster(args(0))
+      .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+    val status = new File(args(1))
+    var result = "failure"
+    try {
+      val data = sc.parallelize(1 to 4, 4).collect().toSet
+      data should be (Set(1, 2, 3, 4))
+      result = "success"
+    } finally {
+      sc.stop()
+      Files.write(result, status, Charsets.UTF_8)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org