You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:47 UTC

[10/18] spark git commit: [SPARK-18662] Move resource managers to separate directory

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
new file mode 100644
index 0000000..8772e26
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
+
+private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and and locality of current
+ * existing and pending allocated containers. The target of this algorithm is to maximize the number
+ * of tasks that would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class LocalityPreferredContainerPlacementStrategy(
+    val sparkConf: SparkConf,
+    val yarnConf: Configuration,
+    val resource: Resource) {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwareTasks number of locality required tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+   *                             numbers running on it, used as hints for container allocation
+   * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+   *                                     expected locality preference by considering the existing
+   *                                     containers
+   * @param localityMatchedPendingAllocations A sequence of pending container request which
+   *                                          matches the localities of current required tasks.
+   * @return node localities and rack localities, each locality is an array of string,
+   *         the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+      numContainer: Int,
+      numLocalityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+      localityMatchedPendingAllocations: Seq[ContainerRequest]
+    ): Array[ContainerLocalityPreferences] = {
+    val updatedHostToContainerCount = expectedHostToContainerCount(
+      numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
+        localityMatchedPendingAllocations)
+    val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
+
+    // The number of containers to allocate, divided into two groups, one with preferred locality,
+    // and the other without locality preference.
+    val requiredLocalityFreeContainerNum =
+      math.max(0, numContainer - updatedLocalityAwareContainerNum)
+    val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
+
+    val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
+    if (requiredLocalityFreeContainerNum > 0) {
+      for (i <- 0 until requiredLocalityFreeContainerNum) {
+        containerLocalityPreferences += ContainerLocalityPreferences(
+          null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
+      }
+    }
+
+    if (requiredLocalityAwareContainerNum > 0) {
+      val largestRatio = updatedHostToContainerCount.values.max
+      // Round the ratio of preferred locality to the number of locality required container
+      // number, which is used for locality preferred host calculating.
+      var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+        val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
+        adjustedRatio.ceil.toInt
+      }
+
+      for (i <- 0 until requiredLocalityAwareContainerNum) {
+        // Only filter out the ratio which is larger than 0, which means the current host can
+        // still be allocated with new container request.
+        val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+        val racks = hosts.map { h =>
+          RackResolver.resolve(yarnConf, h).getNetworkLocation
+        }.toSet
+        containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
+
+        // Minus 1 each time when the host is used. When the current ratio is 0,
+        // which means all the required ratio is satisfied, this host will not be allocated again.
+        preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+      }
+    }
+
+    containerLocalityPreferences.toArray
+  }
+
+  /**
+   * Calculate the number of executors need to satisfy the given number of pending tasks.
+   */
+  private def numExecutorsPending(numTasksPending: Int): Int = {
+    val coresPerExecutor = resource.getVirtualCores
+    (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
+  }
+
+  /**
+   * Calculate the expected host to number of containers by considering with allocated containers.
+   * @param localityAwareTasks number of locality aware tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+   *                             numbers running on it, used as hints for container allocation
+   * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+   *                                     expected locality preference by considering the existing
+   *                                     containers
+   * @param localityMatchedPendingAllocations A sequence of pending container request which
+   *                                          matches the localities of current required tasks.
+   * @return a map with hostname as key and required number of containers on this host as value
+   */
+  private def expectedHostToContainerCount(
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+      localityMatchedPendingAllocations: Seq[ContainerRequest]
+    ): Map[String, Int] = {
+    val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+    val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)
+
+    hostToLocalTaskCount.map { case (host, count) =>
+      val expectedCount =
+        count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
+      // Take the locality of pending containers into consideration
+      val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
+        pendingHostToContainersMap.getOrElse(host, 0.0)
+
+      // If existing container can not fully satisfy the expected number of container,
+      // the required container number is expected count minus existed count. Otherwise the
+      // required container number is 0.
+      (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
+    }
+  }
+
+  /**
+   * According to the locality ratio and number of container requests, calculate the host to
+   * possible number of containers for pending allocated containers.
+   *
+   * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 10,
+   * and pending container requests is 3, so the possible number of containers on
+   * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6.
+   * @param localityMatchedPendingAllocations A sequence of pending container request which
+   *                                          matches the localities of current required tasks.
+   * @return a Map with hostname as key and possible number of containers on this host as value
+   */
+  private def pendingHostToContainerCount(
+      localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, Double] = {
+    val pendingHostToContainerCount = new HashMap[String, Int]()
+    localityMatchedPendingAllocations.foreach { cr =>
+      cr.getNodes.asScala.foreach { n =>
+        val count = pendingHostToContainerCount.getOrElse(n, 0) + 1
+        pendingHostToContainerCount(n) = count
+      }
+    }
+
+    val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
+    val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
+    pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
+      .toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000..0b66d1c
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.Collections
+import java.util.concurrent._
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.RackResolver
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+
+/**
+ * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
+ * what to do with containers when YARN fulfills these requests.
+ *
+ * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
+ * * Making our resource needs known, which updates local bookkeeping about containers requested.
+ * * Calling "allocate", which syncs our local container requests with the RM, and returns any
+ *   containers that YARN has granted to us.  This also functions as a heartbeat.
+ * * Processing the containers granted to us to possibly launch executors inside of them.
+ *
+ * The public methods of this class are thread-safe.  All methods that mutate state are
+ * synchronized.
+ */
+private[yarn] class YarnAllocator(
+    driverUrl: String,
+    driverRef: RpcEndpointRef,
+    conf: YarnConfiguration,
+    sparkConf: SparkConf,
+    amClient: AMRMClient[ContainerRequest],
+    appAttemptId: ApplicationAttemptId,
+    securityMgr: SecurityManager,
+    localResources: Map[String, LocalResource])
+  extends Logging {
+
+  import YarnAllocator._
+
+  // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
+  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
+    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
+  }
+
+  // Visible for testing.
+  val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
+  val allocatedContainerToHostMap = new HashMap[ContainerId, String]
+
+  // Containers that we no longer care about. We've either already told the RM to release them or
+  // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
+  // completed.
+  private val releasedContainers = Collections.newSetFromMap[ContainerId](
+    new ConcurrentHashMap[ContainerId, java.lang.Boolean])
+
+  @volatile private var numExecutorsRunning = 0
+
+  /**
+   * Used to generate a unique ID per executor
+   *
+   * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
+   * the id of new executor will start from 1, this will conflict with the executor has
+   * already created before. So, we should initialize the `executorIdCounter` by getting
+   * the max executorId from driver.
+   *
+   * And this situation of executorId conflict is just in yarn client mode, so this is an issue
+   * in yarn client mode. For more details, can check in jira.
+   *
+   * @see SPARK-12864
+   */
+  private var executorIdCounter: Int =
+    driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+
+  // Queue to store the timestamp of failed executors
+  private val failedExecutorsTimeStamps = new Queue[Long]()
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+    sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+  @volatile private var targetNumExecutors =
+    YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+
+  // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+  // list of requesters that should be responded to once we find out why the given executor
+  // was lost.
+  private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
+  // Maintain loss reasons for already released executors, it will be added when executor loss
+  // reason is got from AM-RM call, and be removed after querying this loss reason.
+  private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
+  // Keep track of which container is running which executor to remove the executors later
+  // Visible for testing.
+  private[yarn] val executorIdToContainer = new HashMap[String, Container]
+
+  private var numUnexpectedContainerRelease = 0L
+  private val containerIdToExecutorId = new HashMap[ContainerId, String]
+
+  // Executor memory in MB.
+  protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+  // Additional memory overhead.
+  protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
+  // Number of cores per executor.
+  protected val executorCores = sparkConf.get(EXECUTOR_CORES)
+  // Resource capability requested for each executors
+  private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
+
+  private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
+    "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
+
+  // For testing
+  private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
+
+  private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
+
+  // ContainerRequest constructor that can take a node label expression. We grab it through
+  // reflection because it's only available in later versions of YARN.
+  private val nodeLabelConstructor = labelExpression.flatMap { expr =>
+    try {
+      Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
+        classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
+        classOf[String]))
+    } catch {
+      case e: NoSuchMethodException =>
+        logWarning(s"Node label expression $expr will be ignored because YARN version on" +
+          " classpath does not support it.")
+        None
+    }
+  }
+
+  // A map to store preferred hostname and possible task numbers running on it.
+  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+  // Number of tasks that have locality preferences in active stages
+  private var numLocalityAwareTasks: Int = 0
+
+  // A container placement strategy based on pending tasks' locality preference
+  private[yarn] val containerPlacementStrategy =
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+
+  /**
+   * Use a different clock for YarnAllocator. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+    clock = newClock
+  }
+
+  def getNumExecutorsRunning: Int = numExecutorsRunning
+
+  def getNumExecutorsFailed: Int = synchronized {
+    val endTime = clock.getTimeMillis()
+
+    while (executorFailuresValidityInterval > 0
+      && failedExecutorsTimeStamps.nonEmpty
+      && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
+      failedExecutorsTimeStamps.dequeue()
+    }
+
+    failedExecutorsTimeStamps.size
+  }
+
+  /**
+   * A sequence of pending container requests that have not yet been fulfilled.
+   */
+  def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
+
+  /**
+   * A sequence of pending container requests at the given location that have not yet been
+   * fulfilled.
+   */
+  private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
+    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
+      .flatMap(_.asScala)
+      .toSeq
+  }
+
+  /**
+   * Request as many executors from the ResourceManager as needed to reach the desired total. If
+   * the requested total is smaller than the current number of running executors, no executors will
+   * be killed.
+   * @param requestedTotal total number of containers requested
+   * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+   *                             container placement hint.
+   * @return Whether the new requested total is different than the old value.
+   */
+  def requestTotalExecutorsWithPreferredLocalities(
+      requestedTotal: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+    this.numLocalityAwareTasks = localityAwareTasks
+    this.hostToLocalTaskCounts = hostToLocalTaskCount
+
+    if (requestedTotal != targetNumExecutors) {
+      logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
+      targetNumExecutors = requestedTotal
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Request that the ResourceManager release the container running the specified executor.
+   */
+  def killExecutor(executorId: String): Unit = synchronized {
+    if (executorIdToContainer.contains(executorId)) {
+      val container = executorIdToContainer.get(executorId).get
+      internalReleaseContainer(container)
+      numExecutorsRunning -= 1
+    } else {
+      logWarning(s"Attempted to kill unknown executor $executorId!")
+    }
+  }
+
+  /**
+   * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
+   * equal to maxExecutors.
+   *
+   * Deal with any containers YARN has granted to us by possibly launching executors in them.
+   *
+   * This must be synchronized because variables read in this method are mutated by other methods.
+   */
+  def allocateResources(): Unit = synchronized {
+    updateResourceRequests()
+
+    val progressIndicator = 0.1f
+    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
+    // requests.
+    val allocateResponse = amClient.allocate(progressIndicator)
+
+    val allocatedContainers = allocateResponse.getAllocatedContainers()
+
+    if (allocatedContainers.size > 0) {
+      logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+        .format(
+          allocatedContainers.size,
+          numExecutorsRunning,
+          allocateResponse.getAvailableResources))
+
+      handleAllocatedContainers(allocatedContainers.asScala)
+    }
+
+    val completedContainers = allocateResponse.getCompletedContainersStatuses()
+    if (completedContainers.size > 0) {
+      logDebug("Completed %d containers".format(completedContainers.size))
+      processCompletedContainers(completedContainers.asScala)
+      logDebug("Finished processing %d completed containers. Current running executor count: %d."
+        .format(completedContainers.size, numExecutorsRunning))
+    }
+  }
+
+  /**
+   * Update the set of container requests that we will sync with the RM based on the number of
+   * executors we have currently running and our target number of executors.
+   *
+   * Visible for testing.
+   */
+  def updateResourceRequests(): Unit = {
+    val pendingAllocate = getPendingAllocate
+    val numPendingAllocate = pendingAllocate.size
+    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+
+    if (missing > 0) {
+      logInfo(s"Will request $missing executor container(s), each with " +
+        s"${resource.getVirtualCores} core(s) and " +
+        s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
+
+      // Split the pending container request into three groups: locality matched list, locality
+      // unmatched list and non-locality list. Take the locality matched container request into
+      // consideration of container placement, treat as allocated containers.
+      // For locality unmatched and locality free container requests, cancel these container
+      // requests, since required locality preference has been changed, recalculating using
+      // container placement strategy.
+      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
+        hostToLocalTaskCounts, pendingAllocate)
+
+      // cancel "stale" requests for locations that are no longer needed
+      staleRequests.foreach { stale =>
+        amClient.removeContainerRequest(stale)
+      }
+      val cancelledContainers = staleRequests.size
+      if (cancelledContainers > 0) {
+        logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
+      }
+
+      // consider the number of new containers and cancelled stale containers available
+      val availableContainers = missing + cancelledContainers
+
+      // to maximize locality, include requests with no locality preference that can be cancelled
+      val potentialContainers = availableContainers + anyHostRequests.size
+
+      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
+        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
+          allocatedHostToContainersMap, localRequests)
+
+      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+      containerLocalityPreferences.foreach {
+        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+          newLocalityRequests += createContainerRequest(resource, nodes, racks)
+        case _ =>
+      }
+
+      if (availableContainers >= newLocalityRequests.size) {
+        // more containers are available than needed for locality, fill in requests for any host
+        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+          newLocalityRequests += createContainerRequest(resource, null, null)
+        }
+      } else {
+        val numToCancel = newLocalityRequests.size - availableContainers
+        // cancel some requests without locality preferences to schedule more local containers
+        anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+          amClient.removeContainerRequest(nonLocal)
+        }
+        if (numToCancel > 0) {
+          logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
+        }
+      }
+
+      newLocalityRequests.foreach { request =>
+        amClient.addContainerRequest(request)
+      }
+
+      if (log.isInfoEnabled()) {
+        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
+        if (anyHost.nonEmpty) {
+          logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+        }
+        localized.foreach { request =>
+          logInfo(s"Submitted container request for host ${hostStr(request)}.")
+        }
+      }
+    } else if (numPendingAllocate > 0 && missing < 0) {
+      val numToCancel = math.min(numPendingAllocate, -missing)
+      logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
+        s"total $targetNumExecutors executors.")
+
+      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
+      if (!matchingRequests.isEmpty) {
+        matchingRequests.iterator().next().asScala
+          .take(numToCancel).foreach(amClient.removeContainerRequest)
+      } else {
+        logWarning("Expected to find pending requests, but found none.")
+      }
+    }
+  }
+
+  private def hostStr(request: ContainerRequest): String = {
+    Option(request.getNodes) match {
+      case Some(nodes) => nodes.asScala.mkString(",")
+      case None => "Any"
+    }
+  }
+
+  /**
+   * Creates a container request, handling the reflection required to use YARN features that were
+   * added in recent versions.
+   */
+  private def createContainerRequest(
+      resource: Resource,
+      nodes: Array[String],
+      racks: Array[String]): ContainerRequest = {
+    nodeLabelConstructor.map { constructor =>
+      constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+        labelExpression.orNull)
+    }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
+  }
+
+  /**
+   * Handle containers granted by the RM by launching executors on them.
+   *
+   * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
+   * in YARN granting containers that we no longer need. In this case, we release them.
+   *
+   * Visible for testing.
+   */
+  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
+    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
+
+    // Match incoming requests by host
+    val remainingAfterHostMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- allocatedContainers) {
+      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
+        containersToUse, remainingAfterHostMatches)
+    }
+
+    // Match remaining by rack
+    val remainingAfterRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterHostMatches) {
+      val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
+      matchContainerToRequest(allocatedContainer, rack, containersToUse,
+        remainingAfterRackMatches)
+    }
+
+    // Assign remaining that are neither node-local nor rack-local
+    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterRackMatches) {
+      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
+        remainingAfterOffRackMatches)
+    }
+
+    if (!remainingAfterOffRackMatches.isEmpty) {
+      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
+        s"allocated to us")
+      for (container <- remainingAfterOffRackMatches) {
+        internalReleaseContainer(container)
+      }
+    }
+
+    runAllocatedContainers(containersToUse)
+
+    logInfo("Received %d containers from YARN, launching executors on %d of them."
+      .format(allocatedContainers.size, containersToUse.size))
+  }
+
+  /**
+   * Looks for requests for the given location that match the given container allocation. If it
+   * finds one, removes the request so that it won't be submitted again. Places the container into
+   * containersToUse or remaining.
+   *
+   * @param allocatedContainer container that was given to us by YARN
+   * @param location resource name, either a node, rack, or *
+   * @param containersToUse list of containers that will be used
+   * @param remaining list of containers that will not be used
+   */
+  private def matchContainerToRequest(
+      allocatedContainer: Container,
+      location: String,
+      containersToUse: ArrayBuffer[Container],
+      remaining: ArrayBuffer[Container]): Unit = {
+    // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
+    // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
+    // memory, but use the asked vcore count for matching, effectively disabling matching on vcore
+    // count.
+    val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
+          resource.getVirtualCores)
+    val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
+      matchingResource)
+
+    // Match the allocation to a request
+    if (!matchingRequests.isEmpty) {
+      val containerRequest = matchingRequests.get(0).iterator.next
+      amClient.removeContainerRequest(containerRequest)
+      containersToUse += allocatedContainer
+    } else {
+      remaining += allocatedContainer
+    }
+  }
+
+  /**
+   * Launches executors in the allocated containers.
+   */
+  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
+    for (container <- containersToUse) {
+      executorIdCounter += 1
+      val executorHostname = container.getNodeId.getHost
+      val containerId = container.getId
+      val executorId = executorIdCounter.toString
+      assert(container.getResource.getMemory >= resource.getMemory)
+      logInfo(s"Launching container $containerId on host $executorHostname")
+
+      def updateInternalState(): Unit = synchronized {
+        numExecutorsRunning += 1
+        executorIdToContainer(executorId) = container
+        containerIdToExecutorId(container.getId) = executorId
+
+        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+          new HashSet[ContainerId])
+        containerSet += containerId
+        allocatedContainerToHostMap.put(containerId, executorHostname)
+      }
+
+      if (numExecutorsRunning < targetNumExecutors) {
+        if (launchContainers) {
+          launcherPool.execute(new Runnable {
+            override def run(): Unit = {
+              try {
+                new ExecutorRunnable(
+                  Some(container),
+                  conf,
+                  sparkConf,
+                  driverUrl,
+                  executorId,
+                  executorHostname,
+                  executorMemory,
+                  executorCores,
+                  appAttemptId.getApplicationId.toString,
+                  securityMgr,
+                  localResources
+                ).run()
+                updateInternalState()
+              } catch {
+                case NonFatal(e) =>
+                  logError(s"Failed to launch executor $executorId on container $containerId", e)
+                  // Assigned container should be released immediately to avoid unnecessary resource
+                  // occupation.
+                  amClient.releaseAssignedContainer(containerId)
+              }
+            }
+          })
+        } else {
+          // For test only
+          updateInternalState()
+        }
+      } else {
+        logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
+          "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
+      }
+    }
+  }
+
+  // Visible for testing.
+  private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+    for (completedContainer <- completedContainers) {
+      val containerId = completedContainer.getContainerId
+      val alreadyReleased = releasedContainers.remove(containerId)
+      val hostOpt = allocatedContainerToHostMap.get(containerId)
+      val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
+      val exitReason = if (!alreadyReleased) {
+        // Decrement the number of executors running. The next iteration of
+        // the ApplicationMaster's reporting thread will take care of allocating.
+        numExecutorsRunning -= 1
+        logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
+          containerId,
+          onHostStr,
+          completedContainer.getState,
+          completedContainer.getExitStatus))
+        // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+        // there are some exit status' we shouldn't necessarily count against us, but for
+        // now I think its ok as none of the containers are expected to exit.
+        val exitStatus = completedContainer.getExitStatus
+        val (exitCausedByApp, containerExitReason) = exitStatus match {
+          case ContainerExitStatus.SUCCESS =>
+            (false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
+              "pre-emption) and not because of an error in the running job.")
+          case ContainerExitStatus.PREEMPTED =>
+            // Preemption is not the fault of the running tasks, since YARN preempts containers
+            // merely to do resource sharing, and tasks that fail due to preempted executors could
+            // just as easily finish on any other executor. See SPARK-8167.
+            (false, s"Container ${containerId}${onHostStr} was preempted.")
+          // Should probably still count memory exceeded exit codes towards task failures
+          case VMEM_EXCEEDED_EXIT_CODE =>
+            (true, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              VMEM_EXCEEDED_PATTERN))
+          case PMEM_EXCEEDED_EXIT_CODE =>
+            (true, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              PMEM_EXCEEDED_PATTERN))
+          case _ =>
+            // Enqueue the timestamp of failed executor
+            failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
+            (true, "Container marked as failed: " + containerId + onHostStr +
+              ". Exit status: " + completedContainer.getExitStatus +
+              ". Diagnostics: " + completedContainer.getDiagnostics)
+
+        }
+        if (exitCausedByApp) {
+          logWarning(containerExitReason)
+        } else {
+          logInfo(containerExitReason)
+        }
+        ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
+      } else {
+        // If we have already released this container, then it must mean
+        // that the driver has explicitly requested it to be killed
+        ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
+          s"Container $containerId exited from explicit termination request.")
+      }
+
+      for {
+        host <- hostOpt
+        containerSet <- allocatedHostToContainersMap.get(host)
+      } {
+        containerSet.remove(containerId)
+        if (containerSet.isEmpty) {
+          allocatedHostToContainersMap.remove(host)
+        } else {
+          allocatedHostToContainersMap.update(host, containerSet)
+        }
+
+        allocatedContainerToHostMap.remove(containerId)
+      }
+
+      containerIdToExecutorId.remove(containerId).foreach { eid =>
+        executorIdToContainer.remove(eid)
+        pendingLossReasonRequests.remove(eid) match {
+          case Some(pendingRequests) =>
+            // Notify application of executor loss reasons so it can decide whether it should abort
+            pendingRequests.foreach(_.reply(exitReason))
+
+          case None =>
+            // We cannot find executor for pending reasons. This is because completed container
+            // is processed before querying pending result. We should store it for later query.
+            // This is usually happened when explicitly killing a container, the result will be
+            // returned in one AM-RM communication. So query RPC will be later than this completed
+            // container process.
+            releasedExecutorLossReasons.put(eid, exitReason)
+        }
+        if (!alreadyReleased) {
+          // The executor could have gone away (like no route to host, node failure, etc)
+          // Notify backend about the failure of the executor
+          numUnexpectedContainerRelease += 1
+          driverRef.send(RemoveExecutor(eid, exitReason))
+        }
+      }
+    }
+  }
+
+  /**
+   * Register that some RpcCallContext has asked the AM why the executor was lost. Note that
+   * we can only find the loss reason to send back in the next call to allocateResources().
+   */
+  private[yarn] def enqueueGetLossReasonRequest(
+      eid: String,
+      context: RpcCallContext): Unit = synchronized {
+    if (executorIdToContainer.contains(eid)) {
+      pendingLossReasonRequests
+        .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+    } else if (releasedExecutorLossReasons.contains(eid)) {
+      // Executor is already released explicitly before getting the loss reason, so directly send
+      // the pre-stored lost reason
+      context.reply(releasedExecutorLossReasons.remove(eid).get)
+    } else {
+      logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+      context.sendFailure(
+        new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
+    }
+  }
+
+  private def internalReleaseContainer(container: Container): Unit = {
+    releasedContainers.add(container.getId())
+    amClient.releaseAssignedContainer(container.getId())
+  }
+
+  private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+
+  private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+    pendingLossReasonRequests.size
+  }
+
+  /**
+   * Split the pending container requests into 3 groups based on current localities of pending
+   * tasks.
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+   *                             container placement hint.
+   * @param pendingAllocations A sequence of pending allocation container request.
+   * @return A tuple of 3 sequences, first is a sequence of locality matched container
+   *         requests, second is a sequence of locality unmatched container requests, and third is a
+   *         sequence of locality free container requests.
+   */
+  private def splitPendingAllocationsByLocality(
+      hostToLocalTaskCount: Map[String, Int],
+      pendingAllocations: Seq[ContainerRequest]
+    ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
+    val localityMatched = ArrayBuffer[ContainerRequest]()
+    val localityUnMatched = ArrayBuffer[ContainerRequest]()
+    val localityFree = ArrayBuffer[ContainerRequest]()
+
+    val preferredHosts = hostToLocalTaskCount.keySet
+    pendingAllocations.foreach { cr =>
+      val nodes = cr.getNodes
+      if (nodes == null) {
+        localityFree += cr
+      } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
+        localityMatched += cr
+      } else {
+        localityUnMatched += cr
+      }
+    }
+
+    (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
+  }
+
+}
+
+private object YarnAllocator {
+  val MEM_REGEX = "[0-9.]+ [KMG]B"
+  val PMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
+  val VMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+  val VMEM_EXCEEDED_EXIT_CODE = -103
+  val PMEM_EXCEEDED_EXIT_CODE = -104
+
+  def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
+    val matcher = pattern.matcher(diagnostics)
+    val diag = if (matcher.find()) " " + matcher.group() + "." else ""
+    ("Container killed by YARN for exceeding memory limits." + diag
+      + " Consider boosting spark.yarn.executor.memoryOverhead.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000..53df11e
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.util.Utils
+
+/**
+ * Handles registering and unregistering the application with the YARN ResourceManager.
+ */
+private[spark] class YarnRMClient extends Logging {
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+  private var uiHistoryAddress: String = _
+  private var registered: Boolean = false
+
+  /**
+   * Registers the application master with the RM.
+   *
+   * @param conf The Yarn configuration.
+   * @param sparkConf The Spark configuration.
+   * @param uiAddress Address of the SparkUI.
+   * @param uiHistoryAddress Address of the application on the History Server.
+   * @param securityMgr The security manager.
+   * @param localResources Map with information about files distributed via YARN's cache.
+   */
+  def register(
+      driverUrl: String,
+      driverRef: RpcEndpointRef,
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      uiAddress: String,
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager,
+      localResources: Map[String, LocalResource]
+    ): YarnAllocator = {
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(conf)
+    amClient.start()
+    this.uiHistoryAddress = uiHistoryAddress
+
+    logInfo("Registering the ApplicationMaster")
+    synchronized {
+      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+      registered = true
+    }
+    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
+      localResources)
+  }
+
+  /**
+   * Unregister the AM. Guaranteed to only be called once.
+   *
+   * @param status The final status of the AM.
+   * @param diagnostics Diagnostics message to include in the final status.
+   */
+  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized {
+    if (registered) {
+      amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+    }
+  }
+
+  /** Returns the attempt ID. */
+  def getAttemptId(): ApplicationAttemptId = {
+    YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
+  }
+
+  /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
+    // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+    // so not all stable releases have it.
+    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+      .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+    // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+    try {
+      val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+        classOf[Configuration])
+      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+      val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
+      val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
+      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+    } catch {
+      case e: NoSuchMethodException =>
+        val proxy = WebAppUtils.getProxyHostAndPort(conf)
+        val parts = proxy.split(":")
+        val uriBase = prefix + proxy + proxyBase
+        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+    }
+  }
+
+  /** Returns the maximum number of attempts to register the AM. */
+  def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
+    val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
+    val yarnMaxAttempts = yarnConf.getInt(
+      YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+    val retval: Int = sparkMaxAttempts match {
+      case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
+      case None => yarnMaxAttempts
+    }
+
+    retval
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
new file mode 100644
index 0000000..cc53b1b
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.regex.Matcher
+import java.util.regex.Pattern
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.YarnCommandBuilderUtils
+import org.apache.spark.util.Utils
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+class YarnSparkHadoopUtil extends SparkHadoopUtil {
+
+  private var credentialUpdater: CredentialUpdater = _
+
+  override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+    dest.addCredentials(source.getCredentials())
+  }
+
+  // Note that all params which start with SPARK are propagated all the way through, so if in yarn
+  // mode, this MUST be set to true.
+  override def isYarnMode(): Boolean = { true }
+
+  // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
+  // subsystems. Always create a new config, don't reuse yarnConf.
+  override def newConfiguration(conf: SparkConf): Configuration =
+    new YarnConfiguration(super.newConfiguration(conf))
+
+  // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+  // cluster
+  override def addCredentials(conf: JobConf) {
+    val jobCreds = conf.getCredentials()
+    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+  }
+
+  override def getCurrentUserCredentials(): Credentials = {
+    UserGroupInformation.getCurrentUser().getCredentials()
+  }
+
+  override def addCurrentUserCredentials(creds: Credentials) {
+    UserGroupInformation.getCurrentUser().addCredentials(creds)
+  }
+
+  override def addSecretKeyToUserCredentials(key: String, secret: String) {
+    val creds = new Credentials()
+    creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
+    addCurrentUserCredentials(creds)
+  }
+
+  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+    val credentials = getCurrentUserCredentials()
+    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+  }
+
+  private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = {
+    credentialUpdater =
+      new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
+    credentialUpdater.start()
+  }
+
+  private[spark] override def stopCredentialUpdater(): Unit = {
+    if (credentialUpdater != null) {
+      credentialUpdater.stop()
+      credentialUpdater = null
+    }
+  }
+
+  private[spark] def getContainerId: ContainerId = {
+    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    ConverterUtils.toContainerId(containerIdString)
+  }
+}
+
+object YarnSparkHadoopUtil {
+  // Additional memory overhead
+  // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
+  // the common cases. Memory overhead tends to grow with container size.
+
+  val MEMORY_OVERHEAD_FACTOR = 0.10
+  val MEMORY_OVERHEAD_MIN = 384L
+
+  val ANY_HOST = "*"
+
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
+  // All RM requests are issued with same priority : we do not (yet) have any distinction between
+  // request types (like map/reduce in hadoop for example)
+  val RM_REQUEST_PRIORITY = Priority.newInstance(1)
+
+  def get: YarnSparkHadoopUtil = {
+    val yarnMode = java.lang.Boolean.parseBoolean(
+      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+    if (!yarnMode) {
+      throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!")
+    }
+    SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
+  }
+  /**
+   * Add a path variable to the given environment map.
+   * If the map already contains this key, append the value to the existing value instead.
+   */
+  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+    val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator  + value } else value
+    env.put(key, newValue)
+  }
+
+  /**
+   * Set zero or more environment variables specified by the given input string.
+   * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+   */
+  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
+    if (inputString != null && inputString.length() > 0) {
+      val childEnvs = inputString.split(",")
+      val p = Pattern.compile(environmentVariableRegex)
+      for (cEnv <- childEnvs) {
+        val parts = cEnv.split("=") // split on '='
+        val m = p.matcher(parts(1))
+        val sb = new StringBuffer
+        while (m.find()) {
+          val variable = m.group(1)
+          var replace = ""
+          if (env.get(variable) != None) {
+            replace = env.get(variable).get
+          } else {
+            // if this key is not configured for the child .. get it from the env
+            replace = System.getenv(variable)
+            if (replace == null) {
+            // the env key is note present anywhere .. simply set it
+              replace = ""
+            }
+          }
+          m.appendReplacement(sb, Matcher.quoteReplacement(replace))
+        }
+        m.appendTail(sb)
+        // This treats the environment variable as path variable delimited by `File.pathSeparator`
+        // This is kept for backward compatibility and consistency with Hadoop's behavior
+        addPathToEnvironment(env, parts(0), sb.toString)
+      }
+    }
+  }
+
+  private val environmentVariableRegex: String = {
+    if (Utils.isWindows) {
+      "%([A-Za-z_][A-Za-z0-9_]*?)%"
+    } else {
+      "\\$([A-Za-z_][A-Za-z0-9_]*)"
+    }
+  }
+
+  /**
+   * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+   * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
+   * an inconsistent state.
+   * TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+   * 'something' to fail job ... akin to blacklisting trackers in mapred ?
+   *
+   * The handler if an OOM Exception is thrown by the JVM must be configured on Windows
+   * differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'.
+   *
+   * As the JVM interprets both %p and %%p as the same, we can use either of them. However,
+   * some tests on Windows computers suggest, that the JVM only accepts '%%p'.
+   *
+   * Furthermore, the behavior of the character '%' on the Windows command line differs from
+   * the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment
+   * variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing
+   * '%%p' in an escaped way is '%%%%p'.
+   */
+  private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): Unit = {
+    if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) {
+      if (Utils.isWindows) {
+        javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p")
+      } else {
+        javaOpts += "-XX:OnOutOfMemoryError='kill %p'"
+      }
+    }
+  }
+
+  /**
+   * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
+   * using either
+   *
+   * (Unix-based) `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work.
+   * The argument is enclosed in single quotes and some key characters are escaped.
+   *
+   * (Windows-based) part of a .cmd file in which case windows escaping for each argument must be
+   * applied. Windows is quite lenient, however it is usually Java that causes trouble, needing to
+   * distinguish between arguments starting with '-' and class names. If arguments are surrounded
+   * by ' java takes the following string as is, hence an argument is mistakenly taken as a class
+   * name which happens to start with a '-'. The way to avoid this, is to surround nothing with
+   * a ', but instead with a ".
+   *
+   * @param arg A single argument.
+   * @return Argument quoted for execution via Yarn's generated shell script.
+   */
+  def escapeForShell(arg: String): String = {
+    if (arg != null) {
+      if (Utils.isWindows) {
+        YarnCommandBuilderUtils.quoteForBatchScript(arg)
+      } else {
+        val escaped = new StringBuilder("'")
+        for (i <- 0 to arg.length() - 1) {
+          arg.charAt(i) match {
+            case '$' => escaped.append("\\$")
+            case '"' => escaped.append("\\\"")
+            case '\'' => escaped.append("'\\''")
+            case c => escaped.append(c)
+          }
+        }
+        escaped.append("'").toString()
+      }
+    } else {
+      arg
+    }
+  }
+
+  // YARN/Hadoop acls are specified as user1,user2 group1,group2
+  // Users and groups are separated by a space and hence we need to pass the acls in same format
+  def getApplicationAclsForYarn(securityMgr: SecurityManager)
+      : Map[ApplicationAccessType, String] = {
+    Map[ApplicationAccessType, String] (
+      ApplicationAccessType.VIEW_APP -> (securityMgr.getViewAcls + " " +
+        securityMgr.getViewAclsGroups),
+      ApplicationAccessType.MODIFY_APP -> (securityMgr.getModifyAcls + " " +
+        securityMgr.getModifyAclsGroups)
+    )
+  }
+
+  /**
+   * Expand environment variable using Yarn API.
+   * If environment.$$() is implemented, return the result of it.
+   * Otherwise, return the result of environment.$()
+   * Note: $$() is added in Hadoop 2.4.
+   */
+  private lazy val expandMethod =
+    Try(classOf[Environment].getMethod("$$"))
+      .getOrElse(classOf[Environment].getMethod("$"))
+
+  def expandEnvironment(environment: Environment): String =
+    expandMethod.invoke(environment).asInstanceOf[String]
+
+  /**
+   * Get class path separator using Yarn API.
+   * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
+   * Otherwise, return File.pathSeparator
+   * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
+   */
+  private lazy val classPathSeparatorField =
+    Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
+      .getOrElse(classOf[File].getField("pathSeparator"))
+
+  def getClassPathSeparator(): String = {
+    classPathSeparatorField.get(null).asInstanceOf[String]
+  }
+
+  /**
+   * Getting the initial target number of executors depends on whether dynamic allocation is
+   * enabled.
+   * If not using dynamic allocation it gets the number of executors requested by the user.
+   */
+  def getInitialTargetExecutorNumber(
+      conf: SparkConf,
+      numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+      val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
+      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
+        s"initial executor number $initialNumExecutors must between min executor number " +
+          s"$minNumExecutors and max executor number $maxNumExecutors")
+
+      initialNumExecutors
+    } else {
+      val targetNumExecutors =
+        sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
+      // System property can override environment variable.
+      conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
new file mode 100644
index 0000000..666cb45
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+package object config {
+
+  /* Common app configuration. */
+
+  private[spark] val APPLICATION_TAGS = ConfigBuilder("spark.yarn.tags")
+    .doc("Comma-separated list of strings to pass through as YARN application tags appearing " +
+      "in YARN Application Reports, which can be used for filtering when querying YARN.")
+    .stringConf
+    .toSequence
+    .createOptional
+
+  private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+    ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
+      .doc("Interval after which AM failures will be considered independent and " +
+        "not accumulate towards the attempt count.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  private[spark] val AM_PORT =
+    ConfigBuilder("spark.yarn.am.port")
+      .intConf
+      .createWithDefault(0)
+
+  private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+    ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
+      .doc("Interval after which Executor failures will be considered independent and not " +
+        "accumulate towards the attempt count.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
+    .doc("Maximum number of AM attempts before failing the app.")
+    .intConf
+    .createOptional
+
+  private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first")
+    .doc("Whether to place user jars in front of Spark's classpath.")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath")
+    .doc("Root of configuration paths that is present on gateway nodes, and will be replaced " +
+      "with the corresponding path in cluster machines.")
+    .stringConf
+    .createWithDefault(null)
+
+  private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath")
+    .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " +
+      "in the YARN cluster.")
+    .stringConf
+    .createWithDefault(null)
+
+  private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
+    .stringConf
+    .createWithDefault("default")
+
+  private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address")
+    .stringConf
+    .createOptional
+
+  /* File distribution. */
+
+  private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
+    .doc("Location of archive containing jars files with Spark classes.")
+    .stringConf
+    .createOptional
+
+  private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
+    .doc("Location of jars containing Spark classes.")
+    .stringConf
+    .toSequence
+    .createOptional
+
+  private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
+    .doc("Whether to preserve temporary files created by the job in HDFS.")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication")
+    .doc("Replication factor for files uploaded by Spark to HDFS.")
+    .intConf
+    .createOptional
+
+  private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
+    .doc("Staging directory used while submitting applications.")
+    .stringConf
+    .createOptional
+
+  /* Cluster-mode launcher configuration. */
+
+  private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
+    .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
+      "launcher process.")
+    .booleanConf
+    .createWithDefault(true)
+
+  private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
+    .doc("Interval between reports of the current app status in cluster mode.")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefaultString("1s")
+
+  /* Shared Client-mode AM / Driver configuration. */
+
+  private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefaultString("100s")
+
+  private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
+    .doc("Node label expression for the AM.")
+    .stringConf
+    .createOptional
+
+  private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
+    ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
+      .intConf
+      .createWithDefault(25)
+
+  private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures")
+    .intConf
+    .createOptional
+
+  private[spark] val MAX_REPORTER_THREAD_FAILURES =
+    ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
+      .intConf
+      .createWithDefault(5)
+
+  private[spark] val RM_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("3s")
+
+  private[spark] val INITIAL_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("200ms")
+
+  private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
+    .doc("A comma-separated list of class names of services to add to the scheduler.")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  /* Client-mode AM configuration. */
+
+  private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
+    .intConf
+    .createWithDefault(1)
+
+  private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions")
+    .doc("Extra Java options for the client-mode AM.")
+    .stringConf
+    .createOptional
+
+  private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath")
+    .doc("Extra native library path for the client-mode AM.")
+    .stringConf
+    .createOptional
+
+  private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .createOptional
+
+  private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
+    .bytesConf(ByteUnit.MiB)
+    .createWithDefaultString("512m")
+
+  /* Driver configuration. */
+
+  private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
+    .intConf
+    .createWithDefault(1)
+
+  private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .createOptional
+
+  /* Executor configuration. */
+
+  private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
+    .intConf
+    .createWithDefault(1)
+
+  private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .createOptional
+
+  private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
+    ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
+      .doc("Node label expression for executors.")
+      .stringConf
+      .createOptional
+
+  /* Security configuration. */
+
+  private[spark] val CREDENTIAL_FILE_MAX_COUNT =
+    ConfigBuilder("spark.yarn.credentials.file.retention.count")
+      .intConf
+      .createWithDefault(5)
+
+  private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
+    ConfigBuilder("spark.yarn.credentials.file.retention.days")
+      .intConf
+      .createWithDefault(5)
+
+  private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
+    .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
+      "fs.defaultFS does not need to be listed here.")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  /* Rolled log aggregation configuration. */
+
+  private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
+    ConfigBuilder("spark.yarn.rolledLog.includePattern")
+      .doc("Java Regex to filter the log files which match the defined include pattern and those " +
+        "log files will be aggregated in a rolling fashion.")
+      .stringConf
+      .createOptional
+
+  private[spark] val ROLLED_LOG_EXCLUDE_PATTERN =
+    ConfigBuilder("spark.yarn.rolledLog.excludePattern")
+      .doc("Java Regex to filter the log files which match the defined exclude pattern and those " +
+        "log files will not be aggregated in a rolling fashion.")
+      .stringConf
+      .createOptional
+
+  /* Private configs. */
+
+  private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
+    .internal()
+    .stringConf
+    .createWithDefault(null)
+
+  // Internal config to propagate the location of the user's jar to the driver/executors
+  private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
+    .internal()
+    .stringConf
+    .createOptional
+
+  // Internal config to propagate the locations of any extra jars to add to the classpath
+  // of the executors
+  private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars")
+    .internal()
+    .stringConf
+    .toSequence
+    .createOptional
+
+  /* Configuration and cached file propagation. */
+
+  private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames")
+    .internal()
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes")
+    .internal()
+    .longConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps")
+    .internal()
+    .longConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities")
+    .internal()
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  // Either "file" or "archive", for each file.
+  private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types")
+    .internal()
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  // The location of the conf archive in HDFS.
+  private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive")
+    .internal()
+    .stringConf
+    .createOptional
+
+  private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime")
+    .internal()
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefault(Long.MaxValue)
+
+  private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime")
+    .internal()
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefault(Long.MaxValue)
+
+  // The list of cache-related config entries. This is used by Client and the AM to clean
+  // up the environment so that these settings do not appear on the web UI.
+  private[yarn] val CACHE_CONFIGS = Seq(
+    CACHED_FILES,
+    CACHED_FILES_SIZES,
+    CACHED_FILES_TIMESTAMPS,
+    CACHED_FILES_VISIBILITIES,
+    CACHED_FILES_TYPES,
+    CACHED_CONF_ARCHIVE)
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org